blob: f0ae1269044368c09ac265c4fc633bececa3df5e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;
import java.io.IOException;
import java.net.URL;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.HttpClientConnection;
import org.apache.http.HttpConnectionMetrics;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpVersion;
import org.apache.http.client.HttpClient;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.ConnectionRequest;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHttpRequest;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.util.TestInjection;
import org.junit.BeforeClass;
import org.junit.Test;
@SuppressSSL
public class ConnectionReuseTest extends SolrCloudTestCase {
private AtomicInteger id = new AtomicInteger();
private HttpClientContext context = HttpClientContext.create();
private static final String COLLECTION = "collection1";
@BeforeClass
public static void setupCluster() throws Exception {
TestInjection.failUpdateRequests = "true:100";
configureCluster(1)
.addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
CollectionAdminRequest.createCollection(COLLECTION, "config", 1, 1)
.processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
cluster.getSolrClient().waitForState(COLLECTION, DEFAULT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
}
private SolrClient buildClient(CloseableHttpClient httpClient, URL url) {
switch (random().nextInt(3)) {
case 0:
// currently only testing with 1 thread
return getConcurrentUpdateSolrClient(url.toString() + "/" + COLLECTION, httpClient, 6, 1);
case 1:
return getHttpSolrClient(url.toString() + "/" + COLLECTION, httpClient);
case 2:
CloudSolrClient client = getCloudSolrClient(cluster.getZkServer().getZkAddress(), random().nextBoolean(), httpClient, 30000, 60000);
client.setDefaultCollection(COLLECTION);
return client;
}
throw new RuntimeException("impossible");
}
@Test
public void testConnectionReuse() throws Exception {
URL url = cluster.getJettySolrRunners().get(0).getBaseUrl();
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
CloseableHttpClient httpClient = HttpClientUtil.createClient(null, cm);
try (SolrClient client = buildClient(httpClient, url)) {
HttpHost target = new HttpHost(url.getHost(), url.getPort(), isSSLMode() ? "https" : "http");
HttpRoute route = new HttpRoute(target);
ConnectionRequest mConn = getClientConnectionRequest(httpClient, route, cm);
HttpClientConnection conn1 = getConn(mConn);
headerRequest(target, route, conn1, cm);
cm.releaseConnection(conn1, null, -1, TimeUnit.MILLISECONDS);
int queueBreaks = 0;
int cnt1 = atLeast(3);
int cnt2 = atLeast(30);
for (int j = 0; j < cnt1; j++) {
boolean done = false;
for (int i = 0; i < cnt2; i++) {
AddUpdateCommand c = new AddUpdateCommand(null);
c.solrDoc = sdoc("id", id.incrementAndGet());
try {
client.add(c.solrDoc);
} catch (Exception e) {
e.printStackTrace();
}
if (!done && i > 0 && i < cnt2 - 1 && client instanceof ConcurrentUpdateSolrClient
&& random().nextInt(10) > 8) {
queueBreaks++;
done = true;
Thread.sleep(350); // wait past streaming client poll time of 250ms
}
}
if (client instanceof ConcurrentUpdateSolrClient) {
((ConcurrentUpdateSolrClient) client).blockUntilFinished();
}
}
route = new HttpRoute(new HttpHost(url.getHost(), url.getPort(), isSSLMode() ? "https" : "http"));
mConn = cm.requestConnection(route, HttpSolrClient.cacheKey);
HttpClientConnection conn2 = getConn(mConn);
HttpConnectionMetrics metrics = conn2.getMetrics();
headerRequest(target, route, conn2, cm);
cm.releaseConnection(conn2, null, -1, TimeUnit.MILLISECONDS);
assertNotNull("No connection metrics found - is the connection getting aborted? server closing the connection? "
+ client.getClass().getSimpleName(), metrics);
// we try and make sure the connection we get has handled all of the requests in this test
if (client instanceof ConcurrentUpdateSolrClient) {
// we can't fully control queue polling breaking up requests - allow a bit of leeway
int exp = cnt1 + queueBreaks + 2;
assertTrue(
"We expected all communication via streaming client to use one connection! expected=" + exp + " got="
+ metrics.getRequestCount(),
Math.max(exp, metrics.getRequestCount()) - Math.min(exp, metrics.getRequestCount()) < 3);
} else {
assertTrue("We expected all communication to use one connection! " + client.getClass().getSimpleName() + " "
+ metrics.getRequestCount(),
cnt1 * cnt2 + 2 <= metrics.getRequestCount());
}
}
finally {
HttpClientUtil.close(httpClient);
}
}
public HttpClientConnection getConn(ConnectionRequest mConn)
throws InterruptedException, ConnectionPoolTimeoutException, ExecutionException {
HttpClientConnection conn = mConn.get(30, TimeUnit.SECONDS);
return conn;
}
public void headerRequest(HttpHost target, HttpRoute route, HttpClientConnection conn, PoolingHttpClientConnectionManager cm)
throws IOException, HttpException {
HttpRequest req = new BasicHttpRequest("OPTIONS", "*", HttpVersion.HTTP_1_1);
req.addHeader("Host", target.getHostName());
if (!conn.isOpen()) {
// establish connection based on its route info
cm.connect(conn, route, 1000, context);
// and mark it as route complete
cm.routeComplete(conn, route, context);
}
conn.sendRequestHeader(req);
conn.flush();
conn.receiveResponseHeader();
}
public ConnectionRequest getClientConnectionRequest(HttpClient httpClient, HttpRoute route, PoolingHttpClientConnectionManager cm) {
ConnectionRequest mConn = cm.requestConnection(route, HttpSolrClient.cacheKey);
return mConn;
}
}