SOLR-17211: HttpJdkSolrClient Support Async requests (#2374)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 8e7c179..e4bf541 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -126,6 +126,8 @@
* SOLR-17164: Add 2 arg variant of vectorSimilarity() function (Sanjay Dutt, hossman)
+* SOLR-17211: New SolrJ JDK client supports Async (James Dyer)
+
Optimizations
---------------------
* SOLR-17144: Close searcherExecutor thread per core after 1 minute (Pierre Salagnac, Christine Poerschke)
diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/solrj.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/solrj.adoc
index 32bf0ac..2999599 100644
--- a/solr/solr-ref-guide/modules/deployment-guide/pages/solrj.adoc
+++ b/solr/solr-ref-guide/modules/deployment-guide/pages/solrj.adoc
@@ -98,7 +98,7 @@
- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/HttpSolrClient.html[`HttpSolrClient`] - geared towards query-centric workloads, though also a good general-purpose client.
Communicates directly with a single Solr node.
- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/Http2SolrClient.html[`Http2SolrClient`] - async, non-blocking and general-purpose client that leverage HTTP/2 using the Jetty Http library.
-- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.html[`HttpJdkSolrClient`] - General-purpose client using the JDK's built-in Http Client. Supports both Http/2 and Http/1.1. Targeted for those users wishing to minimize application dependencies.
+- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.html[`HttpJdkSolrClient`] - General-purpose client using the JDK's built-in Http Client. Supports both Http/2 and Http/1.1. Supports async. Targeted for those users wishing to minimize application dependencies.
- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrClient.html[`LBHttpSolrClient`] - balances request load across a list of Solr nodes.
Adjusts the list of "in-service" nodes based on node health.
- {solr-javadocs}/solrj/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.html[`LBHttp2SolrClient`] - just like `LBHttpSolrClient` but using `Http2SolrClient` instead, with the Jetty Http library.
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 6a97bd1..c0277b1 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -422,6 +422,7 @@
private static final Cancellable FAILED_MAKING_REQUEST_CANCELLABLE = () -> {};
+ @Override
public Cancellable asyncRequest(
SolrRequest<?> solrRequest,
String collection,
@@ -470,7 +471,7 @@
}
}
};
-
+ asyncListener.onStart();
req = makeRequestAndSend(solrRequest, url, listener, true);
} catch (SolrServerException | IOException e) {
asyncListener.onFailure(e);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java
index 35e56ae..1474dc1 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java
@@ -38,6 +38,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -51,6 +52,8 @@
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.util.AsyncListener;
+import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -80,7 +83,7 @@
private boolean forceHttp11;
- private boolean shutdownExecutor;
+ private final boolean shutdownExecutor;
protected HttpJdkSolrClient(String serverBaseUrl, HttpJdkSolrClient.Builder builder) {
super(serverBaseUrl, builder);
@@ -134,8 +137,77 @@
}
@Override
+ public Cancellable asyncRequest(
+ SolrRequest<?> solrRequest,
+ String collection,
+ AsyncListener<NamedList<Object>> asyncListener) {
+ try {
+ PreparedRequest pReq = prepareRequest(solrRequest, collection);
+ asyncListener.onStart();
+ CompletableFuture<NamedList<Object>> response =
+ httpClient
+ .sendAsync(pReq.reqb.build(), HttpResponse.BodyHandlers.ofInputStream())
+ .thenApply(
+ httpResponse -> {
+ try {
+ return processErrorsAndResponse(
+ solrRequest, pReq.parserToUse, httpResponse, pReq.url);
+ } catch (SolrServerException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .whenComplete(
+ (nl, t) -> {
+ if (t != null) {
+ asyncListener.onFailure(t);
+ } else {
+ asyncListener.onSuccess(nl);
+ }
+ });
+ return new HttpSolrClientCancellable(response);
+ } catch (Exception e) {
+ asyncListener.onFailure(e);
+ return () -> {};
+ }
+ }
+
+ @Override
public NamedList<Object> request(SolrRequest<?> solrRequest, String collection)
throws SolrServerException, IOException {
+ PreparedRequest pReq = prepareRequest(solrRequest, collection);
+ HttpResponse<InputStream> response = null;
+ try {
+ response = httpClient.send(pReq.reqb.build(), HttpResponse.BodyHandlers.ofInputStream());
+ return processErrorsAndResponse(solrRequest, pReq.parserToUse, response, pReq.url);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (HttpTimeoutException e) {
+ throw new SolrServerException(
+ "Timeout occurred while waiting response from server at: " + pReq.url, e);
+ } catch (SolrException se) {
+ throw se;
+ } catch (RuntimeException re) {
+ throw new SolrServerException(re);
+ } finally {
+ if (pReq.contentWritingFuture != null) {
+ pReq.contentWritingFuture.cancel(true);
+ }
+
+ // See
+ // https://docs.oracle.com/en/java/javase/17/docs/api/java.net.http/java/net/http/HttpResponse.BodySubscribers.html#ofInputStream()
+ if (!wantStream(pReq.parserToUse)) {
+ try {
+ response.body().close();
+ } catch (Exception e1) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ private PreparedRequest prepareRequest(SolrRequest<?> solrRequest, String collection)
+ throws SolrServerException, IOException {
checkClosed();
if (ClientUtils.shouldApplyDefaultCollection(collection, solrRequest)) {
collection = defaultCollection;
@@ -143,19 +215,19 @@
String url = getRequestPath(solrRequest, collection);
ResponseParser parserToUse = responseParser(solrRequest);
ModifiableSolrParams queryParams = initalizeSolrParams(solrRequest, parserToUse);
- HttpResponse<InputStream> resp = null;
+ var reqb = HttpRequest.newBuilder();
+ PreparedRequest pReq = null;
try {
- var reqb = HttpRequest.newBuilder();
switch (solrRequest.getMethod()) {
case GET:
{
- resp = doGet(url, reqb, solrRequest, queryParams);
+ pReq = prepareGet(url, reqb, solrRequest, queryParams);
break;
}
case POST:
case PUT:
{
- resp = doPutOrPost(url, solrRequest.getMethod(), reqb, solrRequest, queryParams);
+ pReq = preparePutOrPost(url, solrRequest.getMethod(), reqb, solrRequest, queryParams);
break;
}
default:
@@ -163,50 +235,34 @@
throw new IllegalStateException("Unsupported method: " + solrRequest.getMethod());
}
}
- return processErrorsAndResponse(solrRequest, parserToUse, resp, url);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (HttpTimeoutException e) {
- throw new SolrServerException(
- "Timeout occurred while waiting response from server at: " + url, e);
- } catch (SolrException se) {
- throw se;
} catch (URISyntaxException | RuntimeException re) {
throw new SolrServerException(re);
- } finally {
- // See
- // https://docs.oracle.com/en/java/javase/17/docs/api/java.net.http/java/net/http/HttpResponse.BodySubscribers.html#ofInputStream()
- if (!wantStream(parserToUse)) {
- try {
- resp.body().close();
- } catch (Exception e1) {
- // ignore
- }
- }
}
+ pReq.parserToUse = parserToUse;
+ pReq.url = url;
+ return pReq;
}
- private HttpResponse<InputStream> doGet(
+ private PreparedRequest prepareGet(
String url,
HttpRequest.Builder reqb,
SolrRequest<?> solrRequest,
ModifiableSolrParams queryParams)
- throws IOException, InterruptedException, URISyntaxException {
+ throws IOException, URISyntaxException {
validateGetRequest(solrRequest);
reqb.GET();
decorateRequest(reqb, solrRequest);
reqb.uri(new URI(url + "?" + queryParams));
- return httpClient.send(reqb.build(), HttpResponse.BodyHandlers.ofInputStream());
+ return new PreparedRequest(reqb, null);
}
- private HttpResponse<InputStream> doPutOrPost(
+ private PreparedRequest preparePutOrPost(
String url,
SolrRequest.METHOD method,
HttpRequest.Builder reqb,
SolrRequest<?> solrRequest,
ModifiableSolrParams queryParams)
- throws IOException, InterruptedException, URISyntaxException {
+ throws IOException, URISyntaxException {
final RequestWriter.ContentWriter contentWriter = requestWriter.getContentWriter(solrRequest);
@@ -274,15 +330,21 @@
URI uriWithQueryParams = new URI(url + "?" + queryParams);
reqb.uri(uriWithQueryParams);
- HttpResponse<InputStream> response;
- try {
- response = httpClient.send(reqb.build(), HttpResponse.BodyHandlers.ofInputStream());
- } finally {
- if (contentWritingFuture != null) {
- contentWritingFuture.cancel(true);
- }
+ return new PreparedRequest(reqb, contentWritingFuture);
+ }
+
+ private static class PreparedRequest {
+ Future<?> contentWritingFuture;
+ HttpRequest.Builder reqb;
+
+ ResponseParser parserToUse;
+
+ String url;
+
+ PreparedRequest(HttpRequest.Builder reqb, Future<?> contentWritingFuture) {
+ this.reqb = reqb;
+ this.contentWritingFuture = contentWritingFuture;
}
- return response;
}
/**
@@ -469,6 +531,23 @@
.collect(Collectors.joining(", "));
}
+ protected static class HttpSolrClientCancellable implements Cancellable {
+ private final CompletableFuture<NamedList<Object>> response;
+
+ protected HttpSolrClientCancellable(CompletableFuture<NamedList<Object>> response) {
+ this.response = response;
+ }
+
+ @Override
+ public void cancel() {
+ response.cancel(true);
+ }
+
+ protected CompletableFuture<NamedList<Object>> getResponse() {
+ return response;
+ }
+ }
+
public static class Builder
extends HttpSolrClientBuilderBase<HttpJdkSolrClient.Builder, HttpJdkSolrClient> {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClientBase.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClientBase.java
index c1171af..a491a11 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClientBase.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClientBase.java
@@ -39,6 +39,8 @@
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.util.AsyncListener;
+import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -368,6 +370,20 @@
protected abstract void updateDefaultMimeTypeForParser();
+ /**
+ * Execute an asynchronous request to a Solr collection
+ *
+ * @param solrRequest the request to perform
+ * @param collection if null the default collection is used
+ * @param asyncListener callers should provide an implementation to handle events: start, success,
+ * exception
+ * @return Cancellable allowing the caller to attempt cancellation
+ */
+ public abstract Cancellable asyncRequest(
+ SolrRequest<?> solrRequest,
+ String collection,
+ AsyncListener<NamedList<Object>> asyncListener);
+
public boolean isV2ApiRequest(final SolrRequest<?> request) {
return request instanceof V2Request || request.getPath().contains("/____v2");
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java
index c8e9ce7..7e135ac 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java
@@ -17,12 +17,18 @@
package org.apache.solr.client.solrj.util;
-/** Listener for async requests */
+/**
+ * Listener for async requests
+ *
+ * @param <T> The result type returned by the {@code onSuccess} method
+ */
public interface AsyncListener<T> {
/** Callback method invoked before processing the request */
default void onStart() {}
+ /** Callback method invoked when the request completes successfully */
void onSuccess(T t);
+ /** Callback method invoked when the request completes in failure */
void onFailure(Throwable throwable);
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java
index 323916a..5269112 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java
@@ -17,6 +17,15 @@
package org.apache.solr.client.solrj.util;
+/**
+ * The return type for solrJ asynchronous requests, providing a mechanism whereby callers may
+ * request cancellation.
+ */
public interface Cancellable {
+
+ /**
+ * Request to cancel the asynchronous request. This may be a no-op in some situations, for
+ * instance, if the request failed or otherwise is complete.
+ */
void cancel();
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/DebugAsyncListener.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/DebugAsyncListener.java
new file mode 100644
index 0000000..b3eaf5f
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/DebugAsyncListener.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import org.apache.solr.client.solrj.util.AsyncListener;
+import org.apache.solr.common.util.NamedList;
+import org.junit.Assert;
+
+public class DebugAsyncListener implements AsyncListener<NamedList<Object>> {
+
+ private final CountDownLatch cdl;
+
+ private final Semaphore wait = new Semaphore(1);
+
+ public volatile boolean onStartCalled;
+
+ public volatile boolean latchCounted;
+
+ public volatile NamedList<Object> onSuccessResult = null;
+
+ public volatile Throwable onFailureResult = null;
+
+ public DebugAsyncListener(CountDownLatch cdl) {
+ this.cdl = cdl;
+ }
+
+ @Override
+ public void onStart() {
+ onStartCalled = true;
+ }
+
+ public void pause() {
+ try {
+ wait.acquire();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void unPause() {
+ wait.release();
+ }
+
+ @Override
+ public void onSuccess(NamedList<Object> entries) {
+ pause();
+ onSuccessResult = entries;
+ if (latchCounted) {
+ Assert.fail("either 'onSuccess' or 'onFailure' should be called exactly once.");
+ }
+ cdl.countDown();
+ latchCounted = true;
+ unPause();
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ pause();
+ onFailureResult = throwable;
+ if (latchCounted) {
+ Assert.fail("either 'onSuccess' or 'onFailure' should be called exactly once.");
+ }
+ cdl.countDown();
+ latchCounted = true;
+ unPause();
+ }
+}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/DebugServlet.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/DebugServlet.java
index 272d922..9cfc31c 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/DebugServlet.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/DebugServlet.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@@ -39,7 +40,7 @@
queryString = null;
cookies = null;
responseHeaders = null;
- responseBody = null;
+ responseBodyByQueryFragment = new ConcurrentHashMap<>();
}
public static Integer errorCode = null;
@@ -49,7 +50,7 @@
public static String queryString = null;
public static javax.servlet.http.Cookie[] cookies = null;
public static List<String[]> responseHeaders = null;
- public static Object responseBody = null;
+ public static Map<String, Object> responseBodyByQueryFragment = new ConcurrentHashMap<>();
public static byte[] requestBody = null;
public static void setErrorCode(Integer code) {
@@ -136,6 +137,19 @@
resp.addHeader(h[0], h[1]);
}
}
+ String qs = req.getQueryString();
+ qs = qs == null ? "" : qs;
+ Object responseBody = null;
+
+ // Tests can set this up to return different response bodies based on substrings in the query
+ // string
+ for (Map.Entry<String, Object> entry : responseBodyByQueryFragment.entrySet()) {
+ if (qs.contains(entry.getKey())) {
+ responseBody = entry.getValue();
+ break;
+ }
+ }
+
if (responseBody != null) {
try {
if (responseBody instanceof String) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java
index 5b4f7eb..a8831fd 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java
@@ -264,6 +264,21 @@
}
@Test
+ public void testAsyncGet() throws Exception {
+ super.testQueryAsync();
+ }
+
+ @Test
+ public void testAsyncPost() throws Exception {
+ super.testUpdateAsync();
+ }
+
+ @Test
+ public void testAsyncException() throws Exception {
+ super.testAsyncExceptionBase();
+ }
+
+ @Test
public void testFollowRedirect() throws Exception {
final String clientUrl = getBaseUrl() + REDIRECT_SERVLET_PATH;
try (Http2SolrClient client =
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpJdkSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpJdkSolrClientTest.java
index d9e9e7a..81d1188 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpJdkSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpJdkSolrClientTest.java
@@ -28,6 +28,9 @@
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -42,11 +45,14 @@
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.response.SolrPingResponse;
+import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.util.SSLTestConfig;
import org.junit.After;
import org.junit.BeforeClass;
@@ -164,10 +170,11 @@
DebugServlet.clear();
if (rp instanceof XMLResponseParser) {
DebugServlet.addResponseHeader("Content-Type", "application/xml; charset=UTF-8");
- DebugServlet.responseBody = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<response />";
+ DebugServlet.responseBodyByQueryFragment.put(
+ "", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<response />");
} else {
DebugServlet.addResponseHeader("Content-Type", "application/octet-stream");
- DebugServlet.responseBody = javabinResponse();
+ DebugServlet.responseBodyByQueryFragment.put("", javabinResponse());
}
String url = getBaseUrl() + DEBUG_SERVLET_PATH;
SolrQuery q = new SolrQuery("foo");
@@ -192,6 +199,69 @@
}
@Test
+ public void testAsyncGet() throws Exception {
+ super.testQueryAsync();
+ }
+
+ @Test
+ public void testAsyncPost() throws Exception {
+ super.testUpdateAsync();
+ }
+
+ @Test
+ public void testAsyncException() throws Exception {
+ DebugAsyncListener listener = super.testAsyncExceptionBase();
+ assertTrue(listener.onFailureResult instanceof CompletionException);
+ CompletionException ce = (CompletionException) listener.onFailureResult;
+ assertTrue(ce.getCause() instanceof BaseHttpSolrClient.RemoteSolrException);
+ assertTrue(ce.getMessage(), ce.getMessage().contains("mime type"));
+ }
+
+ @Test
+ public void testAsyncAndCancel() throws Exception {
+ ResponseParser rp = new XMLResponseParser();
+ DebugServlet.clear();
+ DebugServlet.addResponseHeader("Content-Type", "application/xml; charset=UTF-8");
+ DebugServlet.responseBodyByQueryFragment.put(
+ "", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<response />");
+ String url = getBaseUrl() + DEBUG_SERVLET_PATH;
+ HttpJdkSolrClient.Builder b = builder(url).withResponseParser(rp);
+ CountDownLatch cdl = new CountDownLatch(0);
+ DebugAsyncListener listener = new DebugAsyncListener(cdl);
+ Cancellable cancelMe = null;
+ try (HttpJdkSolrClient client = b.build()) {
+ QueryRequest query = new QueryRequest(new MapSolrParams(Collections.singletonMap("id", "1")));
+
+ // We are pausing in the "whenComplete" stage, in the unlikely event the http request
+ // finishes before the test calls "cancel".
+ listener.pause();
+
+ // Make the request then immediately cancel it!
+ cancelMe = client.asyncRequest(query, "collection1", listener);
+ cancelMe.cancel();
+
+ // We are safe to unpause our client, having guaranteed that our cancel was before everything
+ // completed.
+ listener.unPause();
+ }
+
+ // "onStart" fires before the async call. This part of the request cannot be cancelled.
+ assertTrue(listener.onStartCalled);
+
+ // The client exposes the CompletableFuture to us via this inner class
+ assertTrue(cancelMe instanceof HttpJdkSolrClient.HttpSolrClientCancellable);
+ CompletableFuture<NamedList<Object>> response =
+ ((HttpJdkSolrClient.HttpSolrClientCancellable) cancelMe).getResponse();
+
+ // Even if our cancel didn't happen until we were at "whenComplete", the CompletableFuture will
+ // have set "isCancelled".
+ assertTrue(response.isCancelled());
+
+ // But we cannot guarantee the response will have been returned, or that "onFailure" was fired
+ // with a "CompletionException". This depends on where we were when the cancellation hit.
+ }
+
+ @Test
public void testTimeout() throws Exception {
SolrQuery q = new SolrQuery("*:*");
try (HttpJdkSolrClient client =
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpSolrClientTestBase.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpSolrClientTestBase.java
index b1be49c..0897e2e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpSolrClientTestBase.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpSolrClientTestBase.java
@@ -28,6 +28,8 @@
import java.util.Iterator;
import java.util.Locale;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrJettyTestBase;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrQuery;
@@ -36,9 +38,14 @@
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.SolrPing;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.util.Cancellable;
+import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.MapSolrParams;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.embedded.JettyConfig;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.BeforeClass;
@@ -52,6 +59,7 @@
protected static final String DEBUG_SERVLET_REGEX = DEBUG_SERVLET_PATH + "/*";
protected static final String REDIRECT_SERVLET_PATH = "/redirect";
protected static final String REDIRECT_SERVLET_REGEX = REDIRECT_SERVLET_PATH + "/*";
+ protected static final String COLLECTION_1 = "collection1";
@BeforeClass
public static void beforeTest() throws Exception {
@@ -305,13 +313,13 @@
try {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "collection");
- baseUrlClient.add("collection1", doc);
- baseUrlClient.commit("collection1");
+ baseUrlClient.add(COLLECTION_1, doc);
+ baseUrlClient.commit(COLLECTION_1);
assertEquals(
1,
baseUrlClient
- .query("collection1", new SolrQuery("id:collection"))
+ .query(COLLECTION_1, new SolrQuery("id:collection"))
.getResults()
.getNumFound());
@@ -531,4 +539,112 @@
assertNull(
"No authorization headers expected. Headers: " + DebugServlet.headers, authorizationHeader);
}
+
+ protected void testUpdateAsync() throws Exception {
+ ResponseParser rp = new XMLResponseParser();
+ String url = getBaseUrl();
+ HttpSolrClientBuilderBase<?, ?> b =
+ builder(url, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT).withResponseParser(rp);
+ int limit = 10;
+ CountDownLatch cdl = new CountDownLatch(limit);
+ DebugAsyncListener[] listeners = new DebugAsyncListener[limit];
+ Cancellable[] cancellables = new Cancellable[limit];
+ try (HttpSolrClientBase client = b.build()) {
+
+ // ensure the collection is empty to start
+ client.deleteByQuery(COLLECTION_1, "*:*");
+ client.commit(COLLECTION_1);
+ QueryResponse qr =
+ client.query(
+ COLLECTION_1,
+ new MapSolrParams(Collections.singletonMap("q", "*:*")),
+ SolrRequest.METHOD.POST);
+ assertEquals(0, qr.getResults().getNumFound());
+
+ for (int i = 0; i < limit; i++) {
+ listeners[i] = new DebugAsyncListener(cdl);
+ UpdateRequest ur = new UpdateRequest();
+ ur.add("id", "KEY-" + i);
+ ur.setMethod(SolrRequest.METHOD.POST);
+ client.asyncRequest(ur, COLLECTION_1, listeners[i]);
+ }
+ cdl.await(1, TimeUnit.MINUTES);
+ client.commit(COLLECTION_1);
+
+ // check that the correct number of documents were added
+ qr =
+ client.query(
+ COLLECTION_1,
+ new MapSolrParams(Collections.singletonMap("q", "*:*")),
+ SolrRequest.METHOD.POST);
+ assertEquals(limit, qr.getResults().getNumFound());
+
+ // clean up
+ client.deleteByQuery(COLLECTION_1, "*:*");
+ client.commit(COLLECTION_1);
+ }
+ }
+
+ protected void testQueryAsync() throws Exception {
+ ResponseParser rp = new XMLResponseParser();
+ DebugServlet.clear();
+ DebugServlet.addResponseHeader("Content-Type", "application/xml; charset=UTF-8");
+ String url = getBaseUrl() + DEBUG_SERVLET_PATH;
+ HttpSolrClientBuilderBase<?, ?> b =
+ builder(url, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT).withResponseParser(rp);
+ int limit = 10;
+ CountDownLatch cdl = new CountDownLatch(limit);
+ DebugAsyncListener[] listeners = new DebugAsyncListener[limit];
+ Cancellable[] cancellables = new Cancellable[limit];
+ try (HttpSolrClientBase client = b.build()) {
+ for (int i = 0; i < limit; i++) {
+ DebugServlet.responseBodyByQueryFragment.put(
+ ("id=KEY-" + i),
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<response><result name=\"response\" numFound=\"2\" start=\"1\" numFoundExact=\"true\"><doc><str name=\"id\">KEY-"
+ + i
+ + "</str></doc></result></response>");
+ QueryRequest query =
+ new QueryRequest(new MapSolrParams(Collections.singletonMap("id", "KEY-" + i)));
+ query.setMethod(SolrRequest.METHOD.GET);
+ listeners[i] = new DebugAsyncListener(cdl);
+ client.asyncRequest(query, null, listeners[i]);
+ }
+ cdl.await(1, TimeUnit.MINUTES);
+ }
+
+ for (int i = 0; i < limit; i++) {
+ NamedList<Object> result = listeners[i].onSuccessResult;
+ SolrDocumentList sdl = (SolrDocumentList) result.get("response");
+ assertEquals(2, sdl.getNumFound());
+ assertEquals(1, sdl.getStart());
+ assertTrue(sdl.getNumFoundExact());
+ assertEquals(1, sdl.size());
+ assertEquals(1, sdl.iterator().next().size());
+ assertEquals("KEY-" + i, sdl.iterator().next().get("id"));
+
+ assertNull(listeners[i].onFailureResult);
+ assertTrue(listeners[i].onStartCalled);
+ }
+ }
+
+ protected DebugAsyncListener testAsyncExceptionBase() throws Exception {
+ ResponseParser rp = new XMLResponseParser();
+ DebugServlet.clear();
+ DebugServlet.addResponseHeader("Content-Type", "Wrong Content Type!");
+ String url = getBaseUrl() + DEBUG_SERVLET_PATH;
+ HttpSolrClientBuilderBase<?, ?> b =
+ builder(url, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT).withResponseParser(rp);
+ CountDownLatch cdl = new CountDownLatch(1);
+ DebugAsyncListener listener = new DebugAsyncListener(cdl);
+ try (HttpSolrClientBase client = b.build()) {
+ QueryRequest query = new QueryRequest(new MapSolrParams(Collections.singletonMap("id", "1")));
+ client.asyncRequest(query, COLLECTION_1, listener);
+ cdl.await(1, TimeUnit.MINUTES);
+ }
+
+ assertNotNull(listener.onFailureResult);
+ assertTrue(listener.onStartCalled);
+ assertNull(listener.onSuccessResult);
+ return listener;
+ }
}