Merge pull request #697 from sebastian-nagel/NUTCH-2896-okhttp-connection-pool

NUTCH-2896 Protocol-okhttp: make connection pool configurable
diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index bb9aae1..1ad02a0 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -428,6 +428,27 @@
   </description>
 </property>
 
+<property>
+  <name>http.connection.pool.okhttp</name>
+  <value></value>
+  <description>
+    (EXPERT) List of 3 integer values [M,N,T]: Instantiate
+    protocol-okhttp with M connection pools of size N and a keep-alive
+    time of T seconds.  The product M*N should approx. correspond to
+    the number of crawled hosts in one fetcher task during the
+    keep-alive time.  Because OkHttp's connection pool (v4.9.1) is not
+    optimized for fast look-up of connections, the pool size N should
+    not exceed 1000.  To allow for efficient pooling, multiple pools
+    are instantiated and connections are distributed over the pools by
+    target host name.  If the property is undefined or empty a single
+    pool is used with OkHttp's default size and keep-alive time. For
+    OkHttp 4.9.1 the default is equivalent to a configuration value
+    &quot;1,5,300&quot; - one pool with 5 idle connections and 300
+    sec. keep-alive time.  See also NUTCH-2896 and
+    https://square.github.io/okhttp/3.x/okhttp/okhttp3/ConnectionPool.html
+  </description>
+</property>
+
 <!-- FTP properties -->
 
 <property>
diff --git a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttp.java b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttp.java
index 9cf9779..63fa328 100644
--- a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttp.java
+++ b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttp.java
@@ -32,6 +32,7 @@
 import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.SSLContext;
@@ -51,6 +52,7 @@
 
 import okhttp3.Authenticator;
 import okhttp3.Connection;
+import okhttp3.ConnectionPool;
 import okhttp3.Headers;
 import okhttp3.Interceptor;
 import okhttp3.OkHttpClient;
@@ -65,7 +67,8 @@
 
   private final List<String[]> customRequestHeaders = new LinkedList<>();
 
-  private OkHttpClient client;
+  /** clients, each holding a separate connection pool */
+  private OkHttpClient[] clients;
 
   private static final TrustManager[] trustAllCerts = new TrustManager[] {
       new X509TrustManager() {
@@ -216,7 +219,44 @@
     // enable support for Brotli compression (Content-Encoding)
     builder.addInterceptor(BrotliInterceptor.INSTANCE);
 
-    this.client = builder.build();
+    // instantiate connection pool(s), cf.
+    // https://square.github.io/okhttp/3.x/okhttp/okhttp3/ConnectionPool.html
+    int numConnectionPools = 1;
+    Supplier<ConnectionPool> poolSupplier = null;
+    if (conf.get("http.connection.pool.okhttp", "").isEmpty()) {
+      // empty pool configuration: use a single pool of default size
+    } else {
+      int[] poolConfig = {};
+      try {
+        poolConfig = conf.getInts("http.connection.pool.okhttp");
+      } catch (NumberFormatException e) {
+        // will show warning below
+      }
+      if (poolConfig.length == 3 && poolConfig[0] > 0
+          && poolConfig[1] > 0 && poolConfig[2] > 0) {
+        numConnectionPools = poolConfig[0];
+        int size = poolConfig[1];
+        int time = poolConfig[2];
+        poolSupplier = () -> new ConnectionPool(size, time, TimeUnit.SECONDS);
+        LOG.info(
+            "Using {} connection pool{} with max. {} idle connections "
+                + "and {} sec. connection keep-alive time",
+            poolConfig[0], (poolConfig[0] > 1 ? "s" : ""), poolConfig[1],
+            poolConfig[2]);
+      } else {
+        LOG.warn(
+            "Ignoring invalid connection pool configuration 'http.connection.pool.okhttp': '{}'",
+            conf.get("http.connection.pool.okhttp"));
+      }
+    }
+    if (poolSupplier == null) {
+      poolSupplier = ConnectionPool::new;
+      LOG.info("Using single connection pool with default settings");
+    }
+    this.clients = new OkHttpClient[numConnectionPools];
+    for (int i = 0; i < numConnectionPools; i++) {
+      this.clients[i] = builder.connectionPool(poolSupplier.get()).build();
+    }
   }
 
   class HTTPHeadersInterceptor implements Interceptor {
@@ -320,8 +360,19 @@
     return this.customRequestHeaders;
   }
 
-  protected OkHttpClient getClient() {
-    return this.client;
+  /**
+   * Distribute hosts over clients by host name
+   * 
+   * @param url
+   *          URL to fetch
+   * @return client responsible to fetch the given URL
+   */
+  protected OkHttpClient getClient(URL url) {
+    if (this.clients.length == 1) {
+      return this.clients[0];
+    }
+    int hash = url.getHost().hashCode();
+    return this.clients[(hash & Integer.MAX_VALUE) % this.clients.length];
   }
 
   @Override
diff --git a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
index 6dcbe16..5ec6a9b 100644
--- a/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
+++ b/src/plugin/protocol-okhttp/src/java/org/apache/nutch/protocol/okhttp/OkHttpResponse.java
@@ -101,7 +101,7 @@
     }
 
     Request request = rb.build();
-    okhttp3.Call call = okhttp.getClient().newCall(request);
+    okhttp3.Call call = okhttp.getClient(url).newCall(request);
 
     // ensure that Response and underlying ResponseBody are closed
     try (okhttp3.Response response = call.execute()) {