DRILL-8342: Add Automatic Retry for Rate Limited APIs (#2691)
diff --git a/contrib/storage-http/Pagination.md b/contrib/storage-http/Pagination.md
index 8855fb2..f555060 100644
--- a/contrib/storage-http/Pagination.md
+++ b/contrib/storage-http/Pagination.md
@@ -1,12 +1,18 @@
# Auto Pagination in Drill
-Remote APIs frequently implement some sort of pagination as a way of limiting results. However, if you are performing bulk data analysis, it is necessary to reassemble the
+Remote APIs frequently implement some sort of pagination as a way of limiting results. However, if you are performing bulk data analysis, it is necessary to reassemble the
data into one larger dataset. Drill's auto-pagination features allow this to happen in the background, so that the user will get clean data back.
-To use a paginator, you simply have to configure the paginator in the connection for the particular API.
+To use a paginator, you simply have to configure the paginator in the connection for the particular API.
## Words of Caution
While extremely powerful, the auto-pagination feature has the potential to run afoul of APIs rate limits and even potentially DDoS an API. Please use with extreme care.
+## Rate Limits
+When using automatic pagination, you may encounter APIs that have burst limits or other limits
+as to the maximum number of requests in a minute or other amount of time. Drill allows you to
+set a `retryDelay` parameter which is the number of milliseconds that Drill should wait before
+resending the request. This defaults to 1 second. This option is set in the configuration for
+the HTTP plugin.
## Offset Pagination
Offset Pagination uses commands similar to SQL which has a `LIMIT` and an `OFFSET`. With an offset paginator, let's say you want 200 records and the page size is 50 records, the offset paginator will break up your query into 4 requests as shown below:
@@ -17,7 +23,7 @@
* myapi.com?limit=50&offset=150
### Configuring Offset Pagination
-To configure an offset paginator, simply add the following to the configuration for your connection.
+To configure an offset paginator, simply add the following to the configuration for your connection.
```json
"paginator": {
@@ -29,7 +35,7 @@
```
## Page Pagination
-Page pagination is very similar to offset pagination except instead of using an `OFFSET` it uses a page number.
+Page pagination is very similar to offset pagination except instead of using an `OFFSET` it uses a page number.
```json
"paginator": {
@@ -42,9 +48,9 @@
In either case, the `pageSize` parameter should be set to the maximum page size allowable by the API. This will minimize the number of requests Drill is making.
## Index / KeySet Pagination
-Index or KeySet pagination is when the API itself returns values to generate the next page.
+Index or KeySet pagination is when the API itself returns values to generate the next page.
-Consider an API that returned data like this:
+Consider an API that returned data like this:
```json
{
@@ -69,4 +75,4 @@
* `nextPageParam`: The parameter name which returns a complete URL of the next page.
-** Note: Index / Keyset Pagination is only implemented for APIs that return JSON **
+** Note: Index / Keyset Pagination is only implemented for APIs that return JSON **
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
index c19b5b2..5c38430 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpStoragePluginConfig.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.http;
+import com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.OAuthConfig;
@@ -42,8 +43,10 @@
@JsonTypeName(HttpStoragePluginConfig.NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class HttpStoragePluginConfig extends StoragePluginConfig {
private static final Logger logger = LoggerFactory.getLogger(HttpStoragePluginConfig.class);
+ private static final int DEFAULT_RATE_LIMIT = 1000;
public static final String NAME = "http";
public final Map<String, HttpApiConfig> connections;
@@ -55,11 +58,13 @@
* Timeout in {@link TimeUnit#SECONDS}.
*/
public final int timeout;
+ public final int retryDelay;
@JsonCreator
public HttpStoragePluginConfig(@JsonProperty("cacheResults") Boolean cacheResults,
@JsonProperty("connections") Map<String, HttpApiConfig> connections,
@JsonProperty("timeout") Integer timeout,
+ @JsonProperty("retryDelay") Integer retryDelay,
@JsonProperty("username") String username,
@JsonProperty("password") String password,
@JsonProperty("proxyHost") String proxyHost,
@@ -84,6 +89,7 @@
AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER),
oAuthConfig);
this.cacheResults = cacheResults != null && cacheResults;
+ this.retryDelay = (retryDelay == null || retryDelay < 0) ? DEFAULT_RATE_LIMIT : retryDelay;
this.connections = CaseInsensitiveMap.newHashMap();
if (connections != null) {
@@ -121,6 +127,7 @@
this.proxyPort = that.proxyPort;
this.proxyType = that.proxyType;
this.oAuthConfig = that.oAuthConfig;
+ this.retryDelay = that.retryDelay;
}
/**
@@ -139,6 +146,7 @@
this.proxyPort = that.proxyPort;
this.proxyType = that.proxyType;
this.oAuthConfig = that.oAuthConfig;
+ this.retryDelay = that.retryDelay;
}
private static String normalize(String value) {
@@ -158,7 +166,7 @@
return new HttpStoragePluginConfig(
cacheResults,
configFor(connectionName),
- timeout,
+ timeout, retryDelay,
username(),
password(),
proxyHost,
@@ -189,6 +197,7 @@
return Objects.equals(connections, thatConfig.connections) &&
Objects.equals(cacheResults, thatConfig.cacheResults) &&
Objects.equals(proxyHost, thatConfig.proxyHost) &&
+ Objects.equals(retryDelay, thatConfig.retryDelay) &&
Objects.equals(proxyPort, thatConfig.proxyPort) &&
Objects.equals(proxyType, thatConfig.proxyType) &&
Objects.equals(oAuthConfig, thatConfig.oAuthConfig) &&
@@ -202,6 +211,7 @@
.field("connections", connections)
.field("cacheResults", cacheResults)
.field("timeout", timeout)
+ .field("retryDelay", retryDelay)
.field("proxyHost", proxyHost)
.field("proxyPort", proxyPort)
.field("credentialsProvider", credentialsProvider)
@@ -213,7 +223,7 @@
@Override
public int hashCode() {
- return Objects.hash(connections, cacheResults, timeout,
+ return Objects.hash(connections, cacheResults, timeout, retryDelay,
proxyHost, proxyPort, proxyType, oAuthConfig, credentialsProvider, authMode);
}
@@ -226,6 +236,11 @@
@JsonProperty("timeout")
public int timeout() { return timeout;}
+ @JsonProperty("retryDelay")
+ public int retryDelay() {
+ return retryDelay;
+ }
+
@JsonProperty("proxyHost")
public String proxyHost() { return proxyHost; }
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index e751f61..d0f12f2 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -123,6 +123,7 @@
private final HttpStoragePluginConfig pluginConfig;
private final HttpApiConfig apiConfig;
private final OAuthConfig oAuthConfig;
+ private final int rateLimit;
private String responseMessage;
private int responseCode;
private String responseProtocol;
@@ -140,6 +141,7 @@
this.filters = scanDefn.filters();
this.url = url;
this.tempDir = tempDir;
+ this.rateLimit = scanDefn.tableSpec().config().retryDelay();
this.proxyConfig = proxyConfig;
this.errorContext = errorContext;
this.tokenTable = scanDefn.tableSpec().getTokenTable();
@@ -162,7 +164,7 @@
*/
public SimpleHttp(HttpUrl url, File tempDir, HttpProxyConfig proxyConfig, CustomErrorContext errorContext,
Paginator paginator, PersistentTokenTable tokenTable, HttpStoragePluginConfig pluginConfig,
- HttpApiConfig endpointConfig, String connection, Map<String, String> filters) {
+ HttpApiConfig endpointConfig, String connection, Map<String, String> filters, int rateLimit) {
this.url = url;
this.tempDir = tempDir;
this.proxyConfig = proxyConfig;
@@ -185,6 +187,7 @@
this.apiConfig = endpointConfig;
this.connection = connection;
this.filters = filters;
+ this.rateLimit = rateLimit;
this.oAuthConfig = pluginConfig.oAuthConfig();
this.client = setupHttpClient();
}
@@ -240,13 +243,13 @@
builder.connectTimeout(timeout, TimeUnit.SECONDS);
builder.writeTimeout(timeout, TimeUnit.SECONDS);
builder.readTimeout(timeout, TimeUnit.SECONDS);
+ builder.addInterceptor(new RateLimitInterceptor(rateLimit));
// OkHttp's connection pooling is disabled because the HTTP plugin creates
// and discards potentially many OkHttp clients, each leaving lingering
// CLOSE_WAIT connections around if they have pooling enabled.
builder.connectionPool(new ConnectionPool(0, 1, TimeUnit.SECONDS));
// Code to skip SSL Certificate validation
- // Sourced from https://stackoverflow.com/questions/60110848/how-to-disable-ssl-verification
if (! apiConfig.verifySSLCert()) {
try {
TrustManager[] trustAllCerts = getAllTrustingTrustManager();
@@ -1023,6 +1026,38 @@
}
}
+ /**
+ * This interceptor is used in pagination situations or elsewhere when APIs have burst throttling. The rate limit interceptor
+ * will wait a configurable number of milliseconds and retry queries if it encounters a 429
+ * response code.
+ */
+ public static class RateLimitInterceptor implements Interceptor {
+ private final int millis;
+ public RateLimitInterceptor(int millis) {
+ this.millis = millis;
+ }
+
+ @NotNull
+ @Override
+ public Response intercept(Chain chain) throws IOException {
+
+ Response response = chain.proceed(chain.request());
+ // 429 is how the api indicates a rate limit error
+ if (!response.isSuccessful() && response.code() == 429) {
+ logger.info("Received 429 Response. Throttling API calls: {} ", response.message());
+ // Wait and retry request
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ logger.error("Error retrying HTTP request: {}", e.getMessage());
+ }
+ response = chain.proceed(chain.request());
+ }
+ return response;
+ }
+ }
+
+
public static class SimpleHttpBuilder {
private HttpSubScan scanDefn;
private HttpUrl url;
@@ -1037,6 +1072,7 @@
private Map<String,String> filters;
private String connection;
private String username;
+ private int rateLimit;
public SimpleHttpBuilder scanDefn(HttpSubScan scanDefn) {
this.scanDefn = scanDefn;
@@ -1046,6 +1082,7 @@
this.tokenTable = scanDefn.tableSpec().getTokenTable();
this.filters = scanDefn.filters();
this.username = scanDefn.getUserName();
+ this.rateLimit = scanDefn.tableSpec().config().retryDelay();
return this;
}
@@ -1079,6 +1116,11 @@
return this;
}
+ public SimpleHttpBuilder rateLimit(int rateLimit) {
+ this.rateLimit = rateLimit;
+ return this;
+ }
+
public SimpleHttpBuilder tokenTable(PersistentTokenTable tokenTable) {
this.tokenTable = tokenTable;
return this;
@@ -1105,12 +1147,11 @@
return this;
}
-
public SimpleHttp build() {
if (this.scanDefn != null) {
return new SimpleHttp(scanDefn, url, tempDir, proxyConfig, errorContext, paginator);
} else {
- return new SimpleHttp(url, tempDir, proxyConfig, errorContext, paginator, tokenTable, pluginConfig, endpointConfig, connection, filters);
+ return new SimpleHttp(url, tempDir, proxyConfig, errorContext, paginator, tokenTable, pluginConfig, endpointConfig, connection, filters, rateLimit);
}
}
}
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 928975f..c09e3a5 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -138,7 +138,8 @@
configs.put("pokemon", pokemonConfig);
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
- new HttpStoragePluginConfig(false, configs, 10, null, null, "", 80, "", "", "", null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER, AuthMode.SHARED_USER.name());
+ new HttpStoragePluginConfig(false, configs, 10, 1000, null, null, "", 80, "", "", "", null, PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER,
+ AuthMode.SHARED_USER.name());
mockStorageConfigWithWorkspace.setEnabled(true);
cluster.defineStoragePlugin("live", mockStorageConfigWithWorkspace);
}
@@ -355,7 +356,7 @@
configs.put("malformedJson", mockJsonWithMalformedData);
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
- new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "",
+ new HttpStoragePluginConfig(false, configs, 2, 1000, "globaluser", "globalpass", "",
80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
UsernamePasswordCredentials.USERNAME, "globaluser",
UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
index c5901b9..e176d7c 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -117,7 +117,7 @@
configs.put("basicJson", basicJson);
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
- new HttpStoragePluginConfig(false, configs, 200, "globaluser", "globalpass", "",
+ new HttpStoragePluginConfig(false, configs, 200, 1000, "globaluser", "globalpass", "",
80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
UsernamePasswordCredentials.USERNAME, "globaluser",
UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFWithAliases.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFWithAliases.java
index 15152de..1bd574c 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFWithAliases.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFWithAliases.java
@@ -116,7 +116,7 @@
configs.put("basicJson", basicJson);
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
- new HttpStoragePluginConfig(false, configs, 200, "globaluser", "globalpass", "",
+ new HttpStoragePluginConfig(false, configs, 200, 1000, "globaluser", "globalpass", "",
80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
UsernamePasswordCredentials.USERNAME, "globaluser",
UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
index f38512d..695c60f 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthProcess.java
@@ -111,7 +111,7 @@
// Add storage plugin for test OAuth
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
- new HttpStoragePluginConfig(false, configs, TIMEOUT, null, null, "", 80, "", "", "",
+ new HttpStoragePluginConfig(false, configs, TIMEOUT, 1000, null, null, "", 80, "", "", "",
oAuthConfig, credentialsProvider, AuthMode.SHARED_USER.name());
mockStorageConfigWithWorkspace.setEnabled(true);
cluster.defineStoragePlugin("localOauth", mockStorageConfigWithWorkspace);
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
index 0490005..2e818ae 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestOAuthTokenUpdate.java
@@ -91,7 +91,7 @@
// Add storage plugin for test OAuth
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
- new HttpStoragePluginConfig(false, configs, TIMEOUT,null, null, "", 80, "", "", "",
+ new HttpStoragePluginConfig(false, configs, TIMEOUT, 1000, null, null, "", 80, "", "", "",
oAuthConfig, credentialsProvider, AuthMode.SHARED_USER.name());
mockStorageConfigWithWorkspace.setEnabled(true);
cluster.defineStoragePlugin("localOauth", mockStorageConfigWithWorkspace);
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
index d04f230..2334315 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestPagination.java
@@ -132,7 +132,7 @@
configs.put("github", githubConfig);
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
- new HttpStoragePluginConfig(false, configs, 10, null, null, "", 80, "", "", "", null,
+ new HttpStoragePluginConfig(false, configs, 10, 1000, null, null, "", 80, "", "", "", null,
PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER, AuthMode.SHARED_USER.name());
mockStorageConfigWithWorkspace.setEnabled(true);
cluster.defineStoragePlugin("live", mockStorageConfigWithWorkspace);
@@ -268,7 +268,7 @@
configs.put("xml_paginator_url_params", mockXmlConfigWithPaginatorAndUrlParams);
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
- new HttpStoragePluginConfig(false, configs, 2, null, null, "", 80, "", "", "", null,
+ new HttpStoragePluginConfig(false, configs, 2,1000, null, null, "", 80, "", "", "", null,
PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER, AuthMode.SHARED_USER.name());
mockStorageConfigWithWorkspace.setEnabled(true);
cluster.defineStoragePlugin("local", mockStorageConfigWithWorkspace);
@@ -315,6 +315,33 @@
}
@Test
+ public void simpleJSONPaginatorQueryWith429() throws Exception {
+ // This test simulates an http request that hits a burst limit. In this situation,
+ // Drill will wait and retry the request.
+ String sql = "SELECT * FROM `local`.`json_paginator` LIMIT 4";
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE1));
+ server.enqueue(new MockResponse().setResponseCode(429));
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE2));
+ server.enqueue(new MockResponse().setResponseCode(429));
+ server.enqueue(new MockResponse().setResponseCode(200).setBody(TEST_JSON_PAGE3));
+
+ List<QueryDataBatch> results = client.queryBuilder()
+ .sql(sql)
+ .results();
+
+ int count = 0;
+ for(QueryDataBatch b : results){
+ count += b.getHeader().getRowCount();
+ b.release();
+ }
+ assertEquals(2, results.size());
+ assertEquals(4, count);
+ }
+ }
+
+ @Test
public void simpleJSONIndexQuery() throws Exception {
String sql = "SELECT * FROM `local`.`json_index` LIMIT 4";
try (MockWebServer server = startServer()) {
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java
index 1028668..281320c 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestProvidedSchema.java
@@ -160,7 +160,7 @@
configs.put("noSchema", noSchema);
HttpStoragePluginConfig mockStorageConfigWithWorkspace =
- new HttpStoragePluginConfig(false, configs, 2, "globaluser", "globalpass", "",
+ new HttpStoragePluginConfig(false, configs, 2, 1000, "globaluser", "globalpass", "",
80, "", "", "", null, new PlainCredentialsProvider(ImmutableMap.of(
UsernamePasswordCredentials.USERNAME, "globaluser",
UsernamePasswordCredentials.PASSWORD, "globalpass")), AuthMode.SHARED_USER.name());
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java
index cb3c74c..2252dae 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestUserTranslationInHttpPlugin.java
@@ -133,11 +133,11 @@
PlainCredentialsProvider credentialsProvider = new PlainCredentialsProvider(TEST_USER_2, credentials);
- HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 2, null, null, "", 80, "", "", "", null, credentialsProvider,
+ HttpStoragePluginConfig mockStorageConfigWithWorkspace = new HttpStoragePluginConfig(false, configs, 2, 1000, null, null, "", 80, "", "", "", null, credentialsProvider,
AuthMode.USER_TRANSLATION.name());
mockStorageConfigWithWorkspace.setEnabled(true);
- HttpStoragePluginConfig mockOAuthPlugin = new HttpStoragePluginConfig(false, configs, 2, null, null, "", 80, "", "", "", oAuthConfig, oauthCredentialProvider,
+ HttpStoragePluginConfig mockOAuthPlugin = new HttpStoragePluginConfig(false, configs, 2, 1000, null, null, "", 80, "", "", "", oAuthConfig, oauthCredentialProvider,
AuthMode.USER_TRANSLATION.name());
mockOAuthPlugin.setEnabled(true);