DRILL-8437: Add Header Index Pagination (#2806)
diff --git a/contrib/storage-http/.gitignore b/contrib/storage-http/.gitignore
index 710368b..05c3e7a 100644
--- a/contrib/storage-http/.gitignore
+++ b/contrib/storage-http/.gitignore
@@ -1 +1 @@
-./src/test/resources/logback-test.xml
+/src/test/resources/logback-test.xml
diff --git a/contrib/storage-http/Pagination.md b/contrib/storage-http/Pagination.md
index f555060..0647dbc 100644
--- a/contrib/storage-http/Pagination.md
+++ b/contrib/storage-http/Pagination.md
@@ -76,3 +76,15 @@
** Note: Index / Keyset Pagination is only implemented for APIs that return JSON **
+
+## Header Index Pagination
+Header index pagination is used when the API in question returns a link to the next page in the response header. Shopify is one such example of an API that does this.
+
+The only configuration option is the `nextPageParam` which is the parameter that Drill should look for in the response header.
+
+```json
+ "paginator": {
+ "nextPageParam": "page",
+ "method": "HEADER_INDEX"
+ }
+```
\ No newline at end of file
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index e6661ad..85d8073 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -51,6 +51,7 @@
import java.io.File;
import java.io.InputStream;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -118,6 +119,7 @@
.scanDefn(subScan)
.url(url)
.tempDir(new File(tempDirPath))
+ .paginator(paginator)
.proxyConfig(proxySettings(negotiator.drillConfig(), url))
.errorContext(errorContext)
.build();
@@ -225,7 +227,7 @@
protected Map<String, Object> generatePaginationFieldMap() {
if (paginator == null || paginator.getMode() != PaginatorMethod.INDEX) {
- return null;
+ return Collections.emptyMap();
}
Map<String, Object> fieldMap = new HashMap<>();
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
index df55433..ab710e9 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
@@ -99,6 +99,7 @@
.scanDefn(subScan)
.url(url)
.tempDir(new File(tempDirPath))
+ .paginator(paginator)
.proxyConfig(proxySettings(negotiator.drillConfig(), url))
.errorContext(errorContext)
.build();
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
index e6b226f..9eb2f3d 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpPaginatorConfig.java
@@ -133,10 +133,18 @@
.build(logger);
}
break;
+ case HEADER_INDEX:
+ if (StringUtils.isEmpty(this.nextPageParam)) {
+ throw UserException
+ .validationError()
+ .message("Invalid paginator configuration. For HEADER_INDEX pagination, the nextPageParam must be defined.")
+ .build(logger);
+ }
+ break;
default:
throw UserException
.validationError()
- .message("Invalid paginator method: %s. Drill supports 'OFFSET', 'INDEX' and 'PAGE'", method)
+ .message("Invalid paginator method: %s. Drill supports 'OFFSET', 'INDEX', 'HEADER_INDEX' and 'PAGE'", method)
.build(logger);
}
}
@@ -230,7 +238,8 @@
public enum PaginatorMethod {
OFFSET,
PAGE,
- INDEX
+ INDEX,
+ HEADER_INDEX
}
@JsonIgnore
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
index 0da06a4..d5f277f 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanBatchCreator.java
@@ -34,6 +34,7 @@
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
+import org.apache.drill.exec.store.http.paginator.HeaderIndexPaginator;
import org.apache.drill.exec.store.http.paginator.IndexPaginator;
import org.apache.drill.exec.store.http.paginator.OffsetPaginator;
import org.apache.drill.exec.store.http.paginator.PagePaginator;
@@ -92,7 +93,7 @@
private static class HttpReaderFactory implements ReaderFactory {
private final HttpSubScan subScan;
private final HttpPaginatorConfig paginatorConfig;
- private Paginator paginator;
+ private final Paginator paginator;
private int count;
@@ -105,6 +106,8 @@
// Initialize the paginator and generate the base URLs
this.paginator = getPaginator();
+ } else {
+ this.paginator = null;
}
}
@@ -119,8 +122,6 @@
rawUrl = HttpUrl.parse(subScan.tableSpec().connectionConfig().url());
}
-
-
// If the URL is not parsable or otherwise invalid
if (rawUrl == null) {
throw UserException.validationError()
@@ -130,28 +131,33 @@
urlBuilder = rawUrl.newBuilder();
- Paginator paginator = null;
if (paginatorConfig.getMethodType() == PaginatorMethod.OFFSET) {
- paginator = new OffsetPaginator(urlBuilder,
+ return new OffsetPaginator(urlBuilder,
subScan.maxRecords(),
paginatorConfig.pageSize(),
paginatorConfig.limitParam(),
paginatorConfig.offsetParam());
} else if (paginatorConfig.getMethodType() == PaginatorMethod.PAGE) {
- paginator = new PagePaginator(urlBuilder,
+ return new PagePaginator(urlBuilder,
subScan.maxRecords(),
paginatorConfig.pageSize(),
paginatorConfig.pageParam(),
paginatorConfig.pageSizeParam());
} else if (paginatorConfig.getMethodType() == PaginatorMethod.INDEX) {
- paginator = new IndexPaginator(urlBuilder,
+ return new IndexPaginator(urlBuilder,
0, // Page size not used for Index/Keyset pagination
subScan.maxRecords(),
paginatorConfig.hasMoreParam(),
paginatorConfig.indexParam(),
paginatorConfig.nextPageParam());
+ } else if (paginatorConfig.getMethodType() == PaginatorMethod.HEADER_INDEX) {
+ return new HeaderIndexPaginator(urlBuilder,
+ subScan.maxRecords(),
+ paginatorConfig.pageSize(),
+ paginatorConfig.nextPageParam(),
+ subScan.tableSpec().connectionConfig().url());
}
- return paginator;
+ return null;
}
@Override
@@ -181,6 +187,7 @@
* the group scan such that the calls could be sent to different drillbits.
*/
if (!paginator.hasNext()) {
+ logger.debug("Ending Batch Generation.");
return null;
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/HeaderIndexPaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/HeaderIndexPaginator.java
new file mode 100644
index 0000000..8bf388f
--- /dev/null
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/HeaderIndexPaginator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.http.paginator;
+
+import okhttp3.Headers;
+import okhttp3.HttpUrl.Builder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * The Header Index Paginator is used when the API in question send a link in the HTTP header
+ * containing the URL for the next page.
+ */
+public class HeaderIndexPaginator extends Paginator {
+
+ private static final Logger logger = LoggerFactory.getLogger(HeaderIndexPaginator.class);
+ private static final Pattern URL_REGEX = Pattern.compile("(https?:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*))");
+
+ private final String nextPageParam;
+ private final String firstPageURL;
+ private Headers headers;
+ private boolean firstPage;
+ private int pageCount;
+
+ public HeaderIndexPaginator(Builder builder, int pageSize, int limit, String nextPageParam, String firstPageURL) {
+ super(builder, PaginatorMethod.HEADER_INDEX, pageSize, limit);
+ this.nextPageParam = nextPageParam;
+ this.firstPageURL = firstPageURL;
+ this.firstPage = true;
+ this.pageCount = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ // If the headers are null and it isn't the first page, end pagination
+ if ( !firstPage &&
+ (headers == null || StringUtils.isEmpty(headers.get(nextPageParam)))
+ ) {
+ notifyPartialPage();
+ logger.debug("Ending pagination. No additional info in headers.");
+ return false;
+ }
+
+ return !partialPageReceived;
+ }
+
+ /**
+ * This method sets the headers for the Header Index Paginator. This must be called with updated headers
+ * before the {@link #next()} method is called.
+ * @param headers A {@link Headers} object containing the response headers from the previous API call.
+ */
+ public void setResponseHeaders(Headers headers) {
+ logger.debug("Setting response headers. ");
+ this.headers = headers;
+
+ // If the next page URL is empty or otherwise undefined, halt pagination.
+ if (StringUtils.isEmpty(headers.get(nextPageParam))) {
+ notifyPartialPage();
+ }
+ }
+
+ @Override
+ public String next() {
+ pageCount++;
+ if (firstPage) {
+ firstPage = false;
+ return firstPageURL;
+ }
+
+ if (headers == null) {
+ throw UserException.dataReadError()
+ .message("Headers are empty. HeaderIndex Pagination requires parameters that are passed in the HTTP header." + pageCount)
+ .build(logger);
+ }
+ // Now attempt to retrieve the field from the response headers.
+ String nextPage = headers.get(nextPageParam);
+
+ // If the next page value is null or empty, halt pagination
+ if (StringUtils.isEmpty(nextPage)) {
+ super.notifyPartialPage();
+ return null;
+ }
+
+ logger.debug("Found next page URL: {}", nextPage);
+
+ // Clean up any extraneous garbage from the header field.
+ Matcher urlMatcher = URL_REGEX.matcher(nextPage);
+ if (urlMatcher.find()) {
+ return urlMatcher.group(1);
+ }
+
+ return nextPage;
+ }
+}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
index b640d3a..a7bd7e5 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/paginator/PagePaginator.java
@@ -28,7 +28,7 @@
public class PagePaginator extends Paginator {
- private static final Logger logger = LoggerFactory.getLogger(OffsetPaginator.class);
+ private static final Logger logger = LoggerFactory.getLogger(PagePaginator.class);
private final int limit;
private final String pageParam;
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 3568fe9..6ba9918 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -50,9 +50,11 @@
import org.apache.drill.exec.store.http.HttpApiConfig;
import org.apache.drill.exec.store.http.HttpApiConfig.HttpMethod;
import org.apache.drill.exec.store.http.HttpApiConfig.PostLocation;
+import org.apache.drill.exec.store.http.HttpPaginatorConfig.PaginatorMethod;
import org.apache.drill.exec.store.http.HttpStoragePlugin;
import org.apache.drill.exec.store.http.HttpStoragePluginConfig;
import org.apache.drill.exec.store.http.HttpSubScan;
+import org.apache.drill.exec.store.http.paginator.HeaderIndexPaginator;
import org.apache.drill.exec.store.http.paginator.Paginator;
import org.apache.drill.exec.store.http.oauth.AccessTokenAuthenticator;
import org.apache.drill.exec.store.http.oauth.AccessTokenInterceptor;
@@ -110,8 +112,6 @@
.writeTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
.readTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS)
.build();
-
-
private final OkHttpClient client;
private final File tempDir;
private final HttpProxyConfig proxyConfig;
@@ -436,6 +436,11 @@
logger.debug("HTTP Request for {} successful.", url());
logger.debug("Response Headers: {} ", response.headers());
+ // In the case of Header Index Pagination, send the header(s) to the paginator
+ if (paginator != null && paginator.getMode() == PaginatorMethod.HEADER_INDEX) {
+ ((HeaderIndexPaginator)paginator).setResponseHeaders(response.headers());
+ }
+
// Return the InputStream of the response. Note that it is necessary and
// and sufficient that the caller invokes close() on the returned stream.
return Objects.requireNonNull(response.body()).byteStream();
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
index afa5fe3..e5294a9 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
@@ -164,6 +164,20 @@
headers.put("header1", "value1");
headers.put("header2", "value2");
+ HttpPaginatorConfig headerIndexPaginator = HttpPaginatorConfig.builder()
+ .nextPageParam("link")
+ .pageSize(10)
+ .method("header_index")
+ .build();
+
+ HttpApiConfig mockJsonConfigWithHeaderIndex = HttpApiConfig.builder()
+ .url("http://localhost:8092/json")
+ .method("get")
+ .requireTail(false)
+ .paginator(headerIndexPaginator)
+ .inputType("json")
+ .build();
+
HttpPaginatorConfig offsetPaginatorForJson = HttpPaginatorConfig.builder()
.limitParam("limit")
@@ -329,6 +343,7 @@
configs.put("json_tail", mockJsonConfigWithPaginatorAndTail);
configs.put("xml_paginator", mockXmlConfigWithPaginator);
configs.put("xml_paginator_url_params", mockXmlConfigWithPaginatorAndUrlParams);
+ configs.put("customers", mockJsonConfigWithHeaderIndex);
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
new HttpStoragePluginConfig(false, configs, 2,1000, null, null, "", 80, "", "", "", null,
@@ -339,6 +354,28 @@
@Test
+ public void testPagePaginationWithHeaderIndex() throws Exception {
+ String sql = "SELECT col1, _response_url FROM `local`.`customers`";
+ try (MockWebServer server = startServer()) {
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1).setHeader("link", "http://localhost:8092/json?page=2"));
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE2).setHeader("link", "http://localhost:8092/json?page=3"));
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE3));
+
+ List<QueryDataBatch> results = client.queryBuilder()
+ .sql(sql)
+ .results();
+
+ int count = 0;
+ for(QueryDataBatch b : results){
+ count += b.getHeader().getRowCount();
+ b.release();
+ }
+
+ assertEquals(3, results.size());
+ }
+ }
+
+ @Test
@Ignore("Requires Live Connection to Github")
public void testPagePaginationWithURLParameters() throws Exception {
String sql = "SELECT * FROM live.github WHERE org='apache' LIMIT 15";