Merge pull request #697 from sebastian-nagel/NUTCH-2896-okhttp-connection-pool
NUTCH-2896 Protocol-okhttp: make connection pool configurable
diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index bb9aae1..1ad02a0 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -428,6 +428,27 @@
</description>
</property>
+<property>
+ <name>http.connection.pool.okhttp</name>
+ <value></value>
+ <description>
+ (EXPERT) List of 3 integer values [M,N,T]: Instantiate
+ protocol-okhttp with M connection pools of size N and a keep-alive
+ time of T seconds. The product M*N should approx. correspond to
+ the number of crawled hosts in one fetcher task during the
+ keep-alive time. Because OkHttp's connection pool (v4.9.1) is not
+ optimized for fast look-up of connections, the pool size N should
+ not exceed 1000. To allow for efficient pooling, multiple pools
+ are instantiated and connections are distributed over the pools by
+ target host name. If the property is undefined or empty a single
+ pool is used with OkHttp's default size and keep-alive time. For
+ OkHttp 4.9.1 the default is equivalent to a configuration value
+ "1,5,300" - one pool with 5 idle connections and 300
+ sec. keep-alive time. See also NUTCH-2896 and
+ https://square.github.io/okhttp/3.x/okhttp/okhttp3/ConnectionPool.html
+ </description>
+</property>
+
<!-- FTP properties -->
<property>
diff --git a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttp.java b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttp.java
index 9cf9779..63fa328 100644
--- a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttp.java
+++ b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttp.java
@@ -32,6 +32,7 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
@@ -51,6 +52,7 @@
import okhttp3.Authenticator;
import okhttp3.Connection;
+import okhttp3.ConnectionPool;
import okhttp3.Headers;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
@@ -65,7 +67,8 @@
private final List<String[]> customRequestHeaders = new LinkedList<>();
- private OkHttpClient client;
+ /** clients, each holding a separate connection pool */
+ private OkHttpClient[] clients;
private static final TrustManager[] trustAllCerts = new TrustManager[] {
new X509TrustManager() {
@@ -216,7 +219,44 @@
// enable support for Brotli compression (Content-Encoding)
builder.addInterceptor(BrotliInterceptor.INSTANCE);
- this.client = builder.build();
+ // instantiate connection pool(s), cf.
+ // https://square.github.io/okhttp/3.x/okhttp/okhttp3/ConnectionPool.html
+ int numConnectionPools = 1;
+ Supplier<ConnectionPool> poolSupplier = null;
+ if (conf.get("http.connection.pool.okhttp", "").isEmpty()) {
+ // empty pool configuration: use a single pool of default size
+ } else {
+ int[] poolConfig = {};
+ try {
+ poolConfig = conf.getInts("http.connection.pool.okhttp");
+ } catch (NumberFormatException e) {
+ // will show warning below
+ }
+ if (poolConfig.length == 3 && poolConfig[0] > 0
+ && poolConfig[1] > 0 && poolConfig[2] > 0) {
+ numConnectionPools = poolConfig[0];
+ int size = poolConfig[1];
+ int time = poolConfig[2];
+ poolSupplier = () -> new ConnectionPool(size, time, TimeUnit.SECONDS);
+ LOG.info(
+ "Using {} connection pool{} with max. {} idle connections "
+ + "and {} sec. connection keep-alive time",
+ poolConfig[0], (poolConfig[0] > 1 ? "s" : ""), poolConfig[1],
+ poolConfig[2]);
+ } else {
+ LOG.warn(
+ "Ignoring invalid connection pool configuration 'http.connection.pool.okhttp': '{}'",
+ conf.get("http.connection.pool.okhttp"));
+ }
+ }
+ if (poolSupplier == null) {
+ poolSupplier = ConnectionPool::new;
+ LOG.info("Using single connection pool with default settings");
+ }
+ this.clients = new OkHttpClient[numConnectionPools];
+ for (int i = 0; i < numConnectionPools; i++) {
+ this.clients[i] = builder.connectionPool(poolSupplier.get()).build();
+ }
}
class HTTPHeadersInterceptor implements Interceptor {
@@ -320,8 +360,19 @@
return this.customRequestHeaders;
}
- protected OkHttpClient getClient() {
- return this.client;
+ /**
+ * Distribute hosts over clients by host name
+ *
+ * @param url
+ * URL to fetch
+ * @return client responsible to fetch the given URL
+ */
+ protected OkHttpClient getClient(URL url) {
+ if (this.clients.length == 1) {
+ return this.clients[0];
+ }
+ int hash = url.getHost().hashCode();
+ return this.clients[(hash & Integer.MAX_VALUE) % this.clients.length];
}
@Override
diff --git a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
index 6dcbe16..5ec6a9b 100644
--- a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
+++ b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
@@ -101,7 +101,7 @@
}
Request request = rb.build();
- okhttp3.Call call = okhttp.getClient().newCall(request);
+ okhttp3.Call call = okhttp.getClient(url).newCall(request);
// ensure that Response and underlying ResponseBody are closed
try (okhttp3.Response response = call.execute()) {