SOLR-14763 SolrJ HTTP/2 Async API using CompletableFuture (update for 2024) (#2402)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 386c7d8..067eef0 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -132,6 +132,9 @@
* SOLR-17204: REPLACENODE now supports the source node not being live (Vincent Primault)
+* SOLR-14763: Add a CompletableFuture based asynchronous API to Http2SolrClient, HttpJdkSolrClient and LBHttp2SolrClient.
+ The previous asynchronous API is deprecated. (Rishi Sankar, James Dyer)
+
Optimizations
---------------------
* SOLR-17144: Close searcherExecutor thread per core after 1 minute (Pierre Salagnac, Christine Poerschke)
diff --git a/solr/core/src/java/org/apache/solr/core/CancellableQueryTracker.java b/solr/core/src/java/org/apache/solr/core/CancellableQueryTracker.java
index 2581574..f3b79c3 100644
--- a/solr/core/src/java/org/apache/solr/core/CancellableQueryTracker.java
+++ b/solr/core/src/java/org/apache/solr/core/CancellableQueryTracker.java
@@ -22,14 +22,14 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.CancellableCollector;
/** Tracks metadata for active queries and provides methods for access */
public class CancellableQueryTracker {
// TODO: This needs to become a time aware storage model
- private final Map<String, Cancellable> activeCancellableQueries = new ConcurrentHashMap<>();
+ private final Map<String, CancellableCollector> activeCancellableQueries =
+ new ConcurrentHashMap<>();
private final Map<String, String> activeQueriesGenerated = new ConcurrentHashMap<>();
/**
@@ -81,7 +81,7 @@
activeCancellableQueries.put(queryID, collector);
}
- public Cancellable getCancellableTask(String queryID) {
+ public CancellableCollector getCancellableTask(String queryID) {
if (queryID == null) {
throw new IllegalArgumentException("Input queryID is null");
}
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index 9ee8440..1e315f8 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -31,8 +32,6 @@
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.routing.NoOpReplicaListTransformer;
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
-import org.apache.solr.client.solrj.util.AsyncListener;
-import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
@@ -61,7 +60,7 @@
public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
private HttpShardHandlerFactory httpShardHandlerFactory;
- private Map<ShardResponse, Cancellable> responseCancellableMap;
+ private Map<ShardResponse, CompletableFuture<LBSolrClient.Rsp>> responseFutureMap;
private BlockingQueue<ShardResponse> responses;
private AtomicInteger pending;
private Map<String, List<String>> shardToURLs;
@@ -72,7 +71,7 @@
this.lbClient = httpShardHandlerFactory.loadbalancer;
this.pending = new AtomicInteger(0);
this.responses = new LinkedBlockingQueue<>();
- this.responseCancellableMap = new HashMap<>();
+ this.responseFutureMap = new HashMap<>();
// maps "localhost:8983|localhost:7574" to a shuffled
// List("http://localhost:8983","http://localhost:7574")
@@ -155,43 +154,33 @@
return;
}
- // all variables that set inside this listener must be at least volatile
- responseCancellableMap.put(
- srsp,
- this.lbClient.asyncReq(
- lbReq,
- new AsyncListener<>() {
- volatile long startTime = System.nanoTime();
+ long startTime = System.nanoTime();
+ SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
+ if (requestInfo != null) {
+ req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
+ }
- @Override
- public void onStart() {
- SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
- if (requestInfo != null)
- req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
- }
+ CompletableFuture<LBSolrClient.Rsp> future = this.lbClient.requestAsync(lbReq);
+ future.whenComplete(
+ (rsp, throwable) -> {
+ if (rsp != null) {
+ ssr.nl = rsp.getResponse();
+ srsp.setShardAddress(rsp.getServer());
+ ssr.elapsedTime =
+ TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ responses.add(srsp);
+ } else if (throwable != null) {
+ ssr.elapsedTime =
+ TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ srsp.setException(throwable);
+ if (throwable instanceof SolrException) {
+ srsp.setResponseCode(((SolrException) throwable).code());
+ }
+ responses.add(srsp);
+ }
+ });
- @Override
- public void onSuccess(LBSolrClient.Rsp rsp) {
- ssr.nl = rsp.getResponse();
- srsp.setShardAddress(rsp.getServer());
- ssr.elapsedTime =
- TimeUnit.MILLISECONDS.convert(
- System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
- responses.add(srsp);
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- ssr.elapsedTime =
- TimeUnit.MILLISECONDS.convert(
- System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
- srsp.setException(throwable);
- if (throwable instanceof SolrException) {
- srsp.setResponseCode(((SolrException) throwable).code());
- }
- responses.add(srsp);
- }
- }));
+ responseFutureMap.put(srsp, future);
}
/** Subclasses could modify the request based on the shard */
@@ -229,7 +218,7 @@
try {
while (pending.get() > 0) {
ShardResponse rsp = responses.take();
- responseCancellableMap.remove(rsp);
+ responseFutureMap.remove(rsp);
pending.decrementAndGet();
if (bailOnError && rsp.getException() != null)
@@ -251,11 +240,11 @@
@Override
public void cancelAll() {
- for (Cancellable cancellable : responseCancellableMap.values()) {
- cancellable.cancel();
+ for (CompletableFuture<LBSolrClient.Rsp> future : responseFutureMap.values()) {
+ future.cancel(true);
pending.decrementAndGet();
}
- responseCancellableMap.clear();
+ responseFutureMap.clear();
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/handler/component/QueryCancellationComponent.java b/solr/core/src/java/org/apache/solr/handler/component/QueryCancellationComponent.java
index e846224..9648bb3 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/QueryCancellationComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/QueryCancellationComponent.java
@@ -17,7 +17,7 @@
package org.apache.solr.handler.component;
import java.io.IOException;
-import org.apache.solr.client.solrj.util.Cancellable;
+import org.apache.solr.search.CancellableCollector;
/** Responsible for handling query cancellation requests */
public class QueryCancellationComponent extends SearchComponent {
@@ -44,7 +44,7 @@
throw new RuntimeException("Null query UUID seen");
}
- Cancellable cancellableTask =
+ CancellableCollector cancellableTask =
rb.req.getCore().getCancellableQueryTracker().getCancellableTask(cancellationUUID);
if (cancellableTask != null) {
diff --git a/solr/core/src/java/org/apache/solr/search/CancellableCollector.java b/solr/core/src/java/org/apache/solr/search/CancellableCollector.java
index 3e3c6bb..46ece0e 100644
--- a/solr/core/src/java/org/apache/solr/search/CancellableCollector.java
+++ b/solr/core/src/java/org/apache/solr/search/CancellableCollector.java
@@ -24,10 +24,9 @@
import org.apache.lucene.search.FilterLeafCollector;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.ScoreMode;
-import org.apache.solr.client.solrj.util.Cancellable;
/** Allows a query to be cancelled */
-public class CancellableCollector implements Collector, Cancellable {
+public class CancellableCollector implements Collector {
/** Thrown when a query gets cancelled */
public static class QueryCancelledException extends RuntimeException {}
@@ -67,7 +66,6 @@
return collector.scoreMode();
}
- @Override
public void cancel() {
isQueryCancelled.compareAndSet(false, true);
}
diff --git a/solr/core/src/test/org/apache/solr/handler/TestHttpRequestId.java b/solr/core/src/test/org/apache/solr/handler/TestHttpRequestId.java
index c992064..eedd238 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestHttpRequestId.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestHttpRequestId.java
@@ -19,14 +19,16 @@
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.solr.SolrJettyTestBase;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.request.SolrPing;
-import org.apache.solr.client.solrj.util.AsyncListener;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrNamedThreadFactory;
@@ -79,19 +81,6 @@
final String key = "mdcContextTestKey" + System.nanoTime();
final String value = "TestHttpRequestId" + System.nanoTime();
- AsyncListener<NamedList<Object>> listener =
- new AsyncListener<>() {
- @Override
- public void onSuccess(NamedList<Object> t) {
- assertEquals(value, MDC.get(key));
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- assertEquals(value, MDC.get(key));
- }
- };
-
try (LogListener reqLog =
LogListener.debug(Http2SolrClient.class).substring("response processing")) {
// client setup needs to be same as HttpShardHandlerFactory
@@ -104,18 +93,33 @@
workQueue,
new SolrNamedThreadFactory("httpShardExecutor"),
false);
+ CompletableFuture<NamedList<Object>> cf;
try (Http2SolrClient client =
new Http2SolrClient.Builder(getBaseUrl())
.withDefaultCollection(collection)
.withExecutor(commExecutor)
.build()) {
MDC.put(key, value);
- client.asyncRequest(new SolrPing(), null, listener);
+ cf =
+ client
+ .requestAsync(new SolrPing(), null)
+ .whenComplete((nl, e) -> assertEquals(value, MDC.get(key)));
} finally {
ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
MDC.remove(key);
}
+ try {
+ cf.get(1, TimeUnit.MINUTES);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ fail("interrupted");
+ } catch (TimeoutException te) {
+ fail("timeout");
+ } catch (ExecutionException ee) {
+ // ignore
+ }
+
// expecting 2 events: success|failed, completed
Queue<LogEvent> reqLogQueue = reqLog.getQueue();
assertEquals(2, reqLogQueue.size());
diff --git a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
deleted file mode 100644
index 399134d..0000000
--- a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.update;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketException;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.lucene.tests.util.LuceneTestCase;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.Http2SolrClient;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.util.AsyncListener;
-import org.apache.solr.client.solrj.util.Cancellable;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.util.NamedList;
-
-public class MockingHttp2SolrClient extends Http2SolrClient {
-
- protected MockingHttp2SolrClient(String serverBaseUrl, Http2SolrClient.Builder builder) {
- super(serverBaseUrl, builder);
- }
-
- public enum Exp {
- CONNECT_EXCEPTION,
- SOCKET_EXCEPTION,
- BAD_REQUEST
- }
-
- private volatile Exp exp = null;
- private boolean oneExpPerReq;
-
- @SuppressWarnings({"rawtypes"})
- private Set<SolrRequest> reqGotException;
-
- public static class Builder extends Http2SolrClient.Builder {
- private boolean oneExpPerReq = false;
-
- public Builder(UpdateShardHandlerConfig config) {
- super();
- this.withConnectionTimeout(config.getDistributedConnectionTimeout(), TimeUnit.MILLISECONDS);
- this.withIdleTimeout(config.getDistributedSocketTimeout(), TimeUnit.MILLISECONDS);
- }
-
- @Override
- public MockingHttp2SolrClient build() {
- return new MockingHttp2SolrClient(null, this);
- }
- }
-
- private Exception exception() {
- switch (exp) {
- case CONNECT_EXCEPTION:
- return new ConnectException();
- case SOCKET_EXCEPTION:
- return new SocketException();
- case BAD_REQUEST:
- return new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Bad Request");
- default:
- break;
- }
- return null;
- }
-
- @Override
- public NamedList<Object> request(
- @SuppressWarnings({"rawtypes"}) SolrRequest request, String collection)
- throws SolrServerException, IOException {
- if (request instanceof UpdateRequest) {
- UpdateRequest ur = (UpdateRequest) request;
- if (!ur.getDeleteQuery().isEmpty()) return super.request(request, collection);
- }
-
- if (exp != null) {
- if (oneExpPerReq) {
- if (reqGotException.contains(request)) return super.request(request, collection);
- else reqGotException.add(request);
- }
-
- Exception e = exception();
- if (e instanceof IOException) {
- if (LuceneTestCase.random().nextBoolean()) {
- throw (IOException) e;
- } else {
- throw new SolrServerException(e);
- }
- } else if (e instanceof SolrServerException) {
- throw (SolrServerException) e;
- } else {
- throw new SolrServerException(e);
- }
- }
-
- return super.request(request, collection);
- }
-
- @Override
- public Cancellable asyncRequest(
- SolrRequest<?> request, String collection, AsyncListener<NamedList<Object>> asyncListener) {
- if (request instanceof UpdateRequest) {
- UpdateRequest ur = (UpdateRequest) request;
- // won't throw exception if request is DBQ
- if (ur.getDeleteQuery() != null && !ur.getDeleteQuery().isEmpty()) {
- return super.asyncRequest(request, collection, asyncListener);
- }
- }
-
- if (exp != null) {
- if (oneExpPerReq) {
- if (reqGotException.contains(request)) {
- return super.asyncRequest(request, collection, asyncListener);
- } else reqGotException.add(request);
- }
-
- Exception e = exception();
- if (e instanceof IOException) {
- if (LuceneTestCase.random().nextBoolean()) {
- e = new SolrServerException(e);
- }
- }
- asyncListener.onFailure(e);
- }
-
- return super.asyncRequest(request, collection, asyncListener);
- }
-}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrClient.java
index a134e2f..31ff4f2 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrClient.java
@@ -1182,7 +1182,7 @@
throws SolrServerException, IOException;
/**
- * Execute a request against a Solr server
+ * Execute a request against a Solr server using the default collection
*
* @param request the request to execute
* @return a {@link NamedList} containing the response from the server
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index c0277b1..549bb45 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -31,6 +31,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
@@ -49,7 +50,6 @@
import org.apache.solr.client.solrj.util.AsyncListener;
import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.client.solrj.util.ClientUtils;
-import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
@@ -417,67 +417,95 @@
outStream.flush();
}
- @SuppressWarnings("StaticAssignmentOfThrowable")
- private static final Exception CANCELLED_EXCEPTION = new Exception();
-
- private static final Cancellable FAILED_MAKING_REQUEST_CANCELLABLE = () -> {};
-
@Override
public Cancellable asyncRequest(
SolrRequest<?> solrRequest,
String collection,
AsyncListener<NamedList<Object>> asyncListener) {
- MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
- Request req;
+ asyncListener.onStart();
+ CompletableFuture<NamedList<Object>> cf =
+ requestAsync(solrRequest, collection)
+ .whenComplete(
+ (nl, t) -> {
+ if (t != null) {
+ asyncListener.onFailure(t);
+ } else {
+ asyncListener.onSuccess(nl);
+ }
+ });
+ return () -> cf.cancel(true);
+ }
+
+ @Override
+ public CompletableFuture<NamedList<Object>> requestAsync(
+ final SolrRequest<?> solrRequest, String collection) {
+ if (ClientUtils.shouldApplyDefaultCollection(collection, solrRequest)) {
+ collection = defaultCollection;
+ }
+ MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
+ CompletableFuture<NamedList<Object>> future = new CompletableFuture<>();
+ final MakeRequestReturnValue mrrv;
+ final String url;
try {
- String url = getRequestPath(solrRequest, collection);
- InputStreamResponseListener listener =
- new InputStreamReleaseTrackingResponseListener() {
- @Override
- public void onHeaders(Response response) {
- super.onHeaders(response);
- executor.execute(
- () -> {
- InputStream is = getInputStream();
- try {
- NamedList<Object> body =
- processErrorsAndResponse(solrRequest, response, is, url);
- mdcCopyHelper.onBegin(null);
- log.debug("response processing success");
- asyncListener.onSuccess(body);
- } catch (RemoteSolrException e) {
- if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
+ url = getRequestPath(solrRequest, collection);
+ mrrv = makeRequest(solrRequest, url, true);
+ } catch (SolrServerException | IOException e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+ final ResponseParser parser =
+ solrRequest.getResponseParser() == null ? this.parser : solrRequest.getResponseParser();
+ mrrv.request
+ .onRequestQueued(asyncTracker.queuedListener)
+ .onComplete(asyncTracker.completeListener)
+ .send(
+ new InputStreamResponseListener() {
+ @Override
+ public void onHeaders(Response response) {
+ super.onHeaders(response);
+ InputStreamResponseListener listener = this;
+ executor.execute(
+ () -> {
+ InputStream is = listener.getInputStream();
+ try {
+ NamedList<Object> body =
+ processErrorsAndResponse(solrRequest, response, is, url);
+ mdcCopyHelper.onBegin(null);
+ log.debug("response processing success");
+ future.complete(body);
+ } catch (RemoteSolrException | SolrServerException e) {
mdcCopyHelper.onBegin(null);
log.debug("response processing failed", e);
- asyncListener.onFailure(e);
+ future.completeExceptionally(e);
+ } finally {
+ log.debug("response processing completed");
+ mdcCopyHelper.onComplete(null);
}
- } catch (SolrServerException e) {
- mdcCopyHelper.onBegin(null);
- log.debug("response processing failed", e);
- asyncListener.onFailure(e);
- } finally {
- log.debug("response processing completed");
- mdcCopyHelper.onComplete(null);
- }
- });
- }
-
- @Override
- public void onFailure(Response response, Throwable failure) {
- super.onFailure(response, failure);
- if (failure != CANCELLED_EXCEPTION) {
- asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure));
+ });
}
- }
- };
- asyncListener.onStart();
- req = makeRequestAndSend(solrRequest, url, listener, true);
- } catch (SolrServerException | IOException e) {
- asyncListener.onFailure(e);
- return FAILED_MAKING_REQUEST_CANCELLABLE;
+
+ @Override
+ public void onFailure(Response response, Throwable failure) {
+ super.onFailure(response, failure);
+ future.completeExceptionally(
+ new SolrServerException(failure.getMessage(), failure));
+ }
+ });
+ future.exceptionally(
+ (error) -> {
+ mrrv.request.abort(error);
+ return null;
+ });
+
+ if (mrrv.contentWriter != null) {
+ try (var output = mrrv.requestContent.getOutputStream()) {
+ mrrv.contentWriter.write(output);
+ } catch (IOException ioe) {
+ future.completeExceptionally(ioe);
+ }
}
- return () -> req.abort(CANCELLED_EXCEPTION);
+ return future;
}
@Override
@@ -596,25 +624,49 @@
}
}
+ private static class MakeRequestReturnValue {
+ final Request request;
+ final RequestWriter.ContentWriter contentWriter;
+ final OutputStreamRequestContent requestContent;
+
+ MakeRequestReturnValue(
+ Request request,
+ RequestWriter.ContentWriter contentWriter,
+ OutputStreamRequestContent requestContent) {
+ this.request = request;
+ this.contentWriter = contentWriter;
+ this.requestContent = requestContent;
+ }
+
+ MakeRequestReturnValue(Request request) {
+ this.request = request;
+ this.contentWriter = null;
+ this.requestContent = null;
+ }
+ }
+
private Request makeRequestAndSend(
SolrRequest<?> solrRequest, String url, InputStreamResponseListener listener, boolean isAsync)
throws IOException, SolrServerException {
+ return sendRequest(makeRequest(solrRequest, url, isAsync), listener);
+ }
+ private MakeRequestReturnValue makeRequest(
+ SolrRequest<?> solrRequest, String url, boolean isAsync)
+ throws IOException, SolrServerException {
ModifiableSolrParams wparams = initalizeSolrParams(solrRequest, responseParser(solrRequest));
if (SolrRequest.METHOD.GET == solrRequest.getMethod()) {
validateGetRequest(solrRequest);
var r = httpClient.newRequest(url + wparams.toQueryString()).method(HttpMethod.GET);
decorateRequest(r, solrRequest, isAsync);
- r.send(listener);
- return r;
+ return new MakeRequestReturnValue(r);
}
if (SolrRequest.METHOD.DELETE == solrRequest.getMethod()) {
var r = httpClient.newRequest(url + wparams.toQueryString()).method(HttpMethod.DELETE);
decorateRequest(r, solrRequest, isAsync);
- r.send(listener);
- return r;
+ return new MakeRequestReturnValue(r);
}
if (SolrRequest.METHOD.POST == solrRequest.getMethod()
@@ -632,11 +684,7 @@
var content = new OutputStreamRequestContent(contentWriter.getContentType());
var r = httpClient.newRequest(url + wparams.toQueryString()).method(method).body(content);
decorateRequest(r, solrRequest, isAsync);
- r.send(listener);
- try (var output = content.getOutputStream()) {
- contentWriter.write(output);
- }
- return r;
+ return new MakeRequestReturnValue(r, contentWriter, content);
} else if (streams == null || isMultipart) {
// send server list and request list as query string params
@@ -645,25 +693,35 @@
Request req = httpClient.newRequest(url + queryParams.toQueryString()).method(method);
var r = fillContentStream(req, streams, wparams, isMultipart);
decorateRequest(r, solrRequest, isAsync);
- r.send(listener);
- return r;
+ return new MakeRequestReturnValue(r);
} else {
- // If is has one stream, it is the post body, put the params in the URL
+ // If it has one stream, it is the post body, put the params in the URL
ContentStream contentStream = streams.iterator().next();
var content =
new InputStreamRequestContent(
contentStream.getContentType(), contentStream.getStream());
var r = httpClient.newRequest(url + wparams.toQueryString()).method(method).body(content);
decorateRequest(r, solrRequest, isAsync);
- r.send(listener);
- return r;
+ return new MakeRequestReturnValue(r);
}
}
throw new SolrServerException("Unsupported method: " + solrRequest.getMethod());
}
+ private Request sendRequest(MakeRequestReturnValue mrrv, InputStreamResponseListener listener)
+ throws IOException, SolrServerException {
+ mrrv.request.send(listener);
+
+ if (mrrv.contentWriter != null) {
+ try (var output = mrrv.requestContent.getOutputStream()) {
+ mrrv.contentWriter.write(output);
+ }
+ }
+ return mrrv.request;
+ }
+
private Request fillContentStream(
Request req,
Collection<ContentStream> streams,
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java
index 1474dc1..96366d0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpJdkSolrClient.java
@@ -141,33 +141,41 @@
SolrRequest<?> solrRequest,
String collection,
AsyncListener<NamedList<Object>> asyncListener) {
+ asyncListener.onStart();
+ CompletableFuture<NamedList<Object>> cf =
+ requestAsync(solrRequest, collection)
+ .whenComplete(
+ (nl, t) -> {
+ if (t != null) {
+ asyncListener.onFailure(t);
+ } else {
+ asyncListener.onSuccess(nl);
+ }
+ });
+
+ return new HttpSolrClientCancellable(cf);
+ }
+
+ @Override
+ public CompletableFuture<NamedList<Object>> requestAsync(
+ final SolrRequest<?> solrRequest, String collection) {
try {
PreparedRequest pReq = prepareRequest(solrRequest, collection);
- asyncListener.onStart();
- CompletableFuture<NamedList<Object>> response =
- httpClient
- .sendAsync(pReq.reqb.build(), HttpResponse.BodyHandlers.ofInputStream())
- .thenApply(
- httpResponse -> {
- try {
- return processErrorsAndResponse(
- solrRequest, pReq.parserToUse, httpResponse, pReq.url);
- } catch (SolrServerException e) {
- throw new RuntimeException(e);
- }
- })
- .whenComplete(
- (nl, t) -> {
- if (t != null) {
- asyncListener.onFailure(t);
- } else {
- asyncListener.onSuccess(nl);
- }
- });
- return new HttpSolrClientCancellable(response);
+ return httpClient
+ .sendAsync(pReq.reqb.build(), HttpResponse.BodyHandlers.ofInputStream())
+ .thenApply(
+ httpResponse -> {
+ try {
+ return processErrorsAndResponse(
+ solrRequest, pReq.parserToUse, httpResponse, pReq.url);
+ } catch (SolrServerException e) {
+ throw new RuntimeException(e);
+ }
+ });
} catch (Exception e) {
- asyncListener.onFailure(e);
- return () -> {};
+ CompletableFuture<NamedList<Object>> cf = new CompletableFuture<>();
+ cf.completeExceptionally(e);
+ return cf;
}
}
@@ -417,7 +425,7 @@
private void decorateRequest(HttpRequest.Builder reqb, SolrRequest<?> solrRequest) {
if (requestTimeoutMillis > 0) {
reqb.timeout(Duration.of(requestTimeoutMillis, ChronoUnit.MILLIS));
- } else {
+ } else if (idleTimeoutMillis > 0) {
reqb.timeout(Duration.of(idleTimeoutMillis, ChronoUnit.MILLIS));
}
reqb.header("User-Agent", USER_AGENT);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClientBase.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClientBase.java
index a491a11..95e448d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClientBase.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClientBase.java
@@ -33,6 +33,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
@@ -85,7 +88,11 @@
} else {
this.serverBaseUrl = null;
}
- this.idleTimeoutMillis = builder.idleTimeoutMillis;
+ if (builder.idleTimeoutMillis != null) {
+ this.idleTimeoutMillis = builder.idleTimeoutMillis;
+ } else {
+ this.idleTimeoutMillis = -1;
+ }
this.basicAuthAuthorizationStr = builder.basicAuthAuthorizationStr;
if (builder.requestWriter != null) {
this.requestWriter = builder.requestWriter;
@@ -371,19 +378,45 @@
protected abstract void updateDefaultMimeTypeForParser();
/**
- * Execute an asynchronous request to a Solr collection
- *
+ * @deprecated use {@link #requestAsync(SolrRequest, String)}.
* @param solrRequest the request to perform
* @param collection if null the default collection is used
* @param asyncListener callers should provide an implementation to handle events: start, success,
* exception
* @return Cancellable allowing the caller to attempt cancellation
*/
+ @Deprecated
public abstract Cancellable asyncRequest(
SolrRequest<?> solrRequest,
String collection,
AsyncListener<NamedList<Object>> asyncListener);
+ /**
+ * Execute an asynchronous request against a Solr server for a given collection.
+ *
+ * @param request the request to execute
+ * @param collection the collection to execute the request against
+ * @return a {@link CompletableFuture} that tracks the progress of the async request. Supports
+ * cancelling requests via {@link CompletableFuture#cancel(boolean)}, adding callbacks/error
+ * handling using {@link CompletableFuture#whenComplete(BiConsumer)} and {@link
+ * CompletableFuture#exceptionally(Function)} methods, and other CompletableFuture
+ * functionality. Will complete exceptionally in case of either an {@link IOException} or
+ * {@link SolrServerException} during the request. Once completed, the CompletableFuture will
+ * contain a {@link NamedList} with the response from the server.
+ */
+ public abstract CompletableFuture<NamedList<Object>> requestAsync(
+ final SolrRequest<?> request, String collection);
+
+ /**
+ * Execute an asynchronous request against a Solr server using the default collection.
+ *
+ * @param request the request to execute
+ * @return a {@link CompletableFuture} see {@link #requestAsync(SolrRequest, String)}.
+ */
+ public CompletableFuture<NamedList<Object>> requestAsync(final SolrRequest<?> request) {
+ return requestAsync(request, null);
+ }
+
public boolean isV2ApiRequest(final SolrRequest<?> request) {
return request instanceof V2Request || request.getPath().contains("/____v2");
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
index be81d38..ec44802 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
@@ -24,8 +24,8 @@
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrClient;
@@ -126,20 +126,54 @@
return solrClient.getUrlParamNames();
}
+ /**
+ * Execute an asynchronous request against a one or more hosts for a given collection.
+ *
+ * @param req the wrapped request to perform
+ * @param asyncListener callers should provide an implementation to handle events: start, success,
+ * exception
+ * @return Cancellable allowing the caller to attempt cancellation
+ * @deprecated Use {@link #requestAsync(Req)}.
+ */
+ @Deprecated
public Cancellable asyncReq(Req req, AsyncListener<Rsp> asyncListener) {
+ asyncListener.onStart();
+ CompletableFuture<Rsp> cf =
+ requestAsync(req)
+ .whenComplete(
+ (rsp, t) -> {
+ if (t != null) {
+ asyncListener.onFailure(t);
+ } else {
+ asyncListener.onSuccess(rsp);
+ }
+ });
+ return () -> cf.cancel(true);
+ }
+
+ /**
+ * Execute an asynchronous request against one or more hosts for a given collection. The passed-in
+ * Req object includes a List of Endpoints. This method always begins with the first Endpoint in
+ * the list and if unsuccessful tries each in turn until the request is successful. Consequently,
+ * this method does not actually Load Balance. It is up to the caller to shuffle the List of
+ * Endpoints if Load Balancing is desired.
+ *
+ * @param req the wrapped request to perform
+ * @return a {@link CompletableFuture} that tracks the progress of the async request.
+ */
+ public CompletableFuture<Rsp> requestAsync(Req req) {
+ CompletableFuture<Rsp> apiFuture = new CompletableFuture<>();
Rsp rsp = new Rsp();
boolean isNonRetryable =
req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath());
EndpointIterator it = new EndpointIterator(req, zombieServers);
- asyncListener.onStart();
- final AtomicBoolean cancelled = new AtomicBoolean(false);
- AtomicReference<Cancellable> currentCancellable = new AtomicReference<>();
+ AtomicReference<CompletableFuture<NamedList<Object>>> currentFuture = new AtomicReference<>();
RetryListener retryListener =
new RetryListener() {
@Override
public void onSuccess(Rsp rsp) {
- asyncListener.onSuccess(rsp);
+ apiFuture.complete(rsp);
}
@Override
@@ -149,48 +183,42 @@
try {
url = it.nextOrError(e);
} catch (SolrServerException ex) {
- asyncListener.onFailure(e);
+ apiFuture.completeExceptionally(e);
return;
}
- try {
- MDC.put("LBSolrClient.url", url.toString());
- synchronized (cancelled) {
- if (cancelled.get()) {
- return;
- }
- Cancellable cancellable =
- doRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this);
- currentCancellable.set(cancellable);
- }
- } finally {
- MDC.remove("LBSolrClient.url");
+ MDC.put("LBSolrClient.url", url.toString());
+ if (!apiFuture.isCancelled()) {
+ CompletableFuture<NamedList<Object>> future =
+ doAsyncRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this);
+ currentFuture.set(future);
}
} else {
- asyncListener.onFailure(e);
+ apiFuture.completeExceptionally(e);
}
}
};
try {
- Cancellable cancellable =
- doRequest(
+ CompletableFuture<NamedList<Object>> future =
+ doAsyncRequest(
it.nextOrError(),
req,
rsp,
isNonRetryable,
it.isServingZombieServer(),
retryListener);
- currentCancellable.set(cancellable);
+ currentFuture.set(future);
} catch (SolrServerException e) {
- asyncListener.onFailure(e);
+ apiFuture.completeExceptionally(e);
+ return apiFuture;
}
- return () -> {
- synchronized (cancelled) {
- cancelled.set(true);
- if (currentCancellable.get() != null) {
- currentCancellable.get().cancel();
- }
- }
- };
+ apiFuture.exceptionally(
+ (error) -> {
+ if (apiFuture.isCancelled()) {
+ currentFuture.get().cancel(true);
+ }
+ return null;
+ });
+ return apiFuture;
}
private interface RetryListener {
@@ -199,73 +227,88 @@
void onFailure(Exception e, boolean retryReq);
}
- private Cancellable doRequest(
+ private CompletableFuture<NamedList<Object>> doAsyncRequest(
Endpoint endpoint,
Req req,
Rsp rsp,
boolean isNonRetryable,
boolean isZombie,
RetryListener listener) {
- rsp.server = endpoint.toString();
- req.getRequest().setBasePath(endpoint.toString());
- return ((Http2SolrClient) getClient(endpoint))
- .asyncRequest(
- req.getRequest(),
- null,
- new AsyncListener<>() {
- @Override
- public void onSuccess(NamedList<Object> result) {
- rsp.rsp = result;
- if (isZombie) {
- zombieServers.remove(endpoint);
- }
- listener.onSuccess(rsp);
- }
+ String baseUrl = endpoint.toString();
+ rsp.server = baseUrl;
+ req.getRequest().setBasePath(baseUrl);
+ CompletableFuture<NamedList<Object>> future =
+ ((Http2SolrClient) getClient(endpoint)).requestAsync(req.getRequest());
+ future.whenComplete(
+ (result, throwable) -> {
+ if (!future.isCompletedExceptionally()) {
+ onSuccessfulRequest(result, endpoint, rsp, isZombie, listener);
+ } else if (!future.isCancelled()) {
+ onFailedRequest(throwable, endpoint, isNonRetryable, isZombie, listener);
+ }
+ });
+ return future;
+ }
- @Override
- public void onFailure(Throwable oe) {
- try {
- throw (Exception) oe;
- } catch (BaseHttpSolrClient.RemoteExecutionException e) {
- listener.onFailure(e, false);
- } catch (SolrException e) {
- // we retry on 404 or 403 or 503 or 500
- // unless it's an update - then we only retry on connect exception
- if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
- listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
- } else {
- // Server is alive but the request was likely malformed or invalid
- if (isZombie) {
- zombieServers.remove(endpoint);
- }
- listener.onFailure(e, false);
- }
- } catch (SocketException e) {
- if (!isNonRetryable || e instanceof ConnectException) {
- listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
- } else {
- listener.onFailure(e, false);
- }
- } catch (SocketTimeoutException e) {
- if (!isNonRetryable) {
- listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
- } else {
- listener.onFailure(e, false);
- }
- } catch (SolrServerException e) {
- Throwable rootCause = e.getRootCause();
- if (!isNonRetryable && rootCause instanceof IOException) {
- listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
- } else if (isNonRetryable && rootCause instanceof ConnectException) {
- listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
- } else {
- listener.onFailure(e, false);
- }
- } catch (Exception e) {
- listener.onFailure(new SolrServerException(e), false);
- }
- }
- });
+ private void onSuccessfulRequest(
+ NamedList<Object> result,
+ Endpoint endpoint,
+ Rsp rsp,
+ boolean isZombie,
+ RetryListener listener) {
+ rsp.rsp = result;
+ if (isZombie) {
+ zombieServers.remove(endpoint.toString());
+ }
+ listener.onSuccess(rsp);
+ }
+
+ private void onFailedRequest(
+ Throwable oe,
+ Endpoint endpoint,
+ boolean isNonRetryable,
+ boolean isZombie,
+ RetryListener listener) {
+ try {
+ throw (Exception) oe;
+ } catch (BaseHttpSolrClient.RemoteExecutionException e) {
+ listener.onFailure(e, false);
+ } catch (SolrException e) {
+ // we retry on 404 or 403 or 503 or 500
+ // unless it's an update - then we only retry on connect exception
+ if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
+ listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
+ } else {
+ // Server is alive but the request was likely malformed or invalid
+ if (isZombie) {
+ zombieServers.remove(endpoint.toString());
+ }
+ listener.onFailure(e, false);
+ }
+ } catch (SocketException e) {
+ if (!isNonRetryable || e instanceof ConnectException) {
+ listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
+ } else {
+ listener.onFailure(e, false);
+ }
+ } catch (SocketTimeoutException e) {
+ if (!isNonRetryable) {
+ listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
+ } else {
+ listener.onFailure(e, false);
+ }
+ } catch (SolrServerException e) {
+ Throwable rootCause = e.getRootCause();
+ if (!isNonRetryable && rootCause instanceof IOException) {
+ listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
+ } else if (isNonRetryable && rootCause instanceof ConnectException) {
+ listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
+ } else {
+ listener.onFailure(e, false);
+ }
+ } catch (Exception e) {
+ listener.onFailure(new SolrServerException(e), false);
+ }
}
public static class Builder {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
index 9e00e16..90566e4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
@@ -265,6 +265,7 @@
while (it.hasNext()) {
endpoint = it.next();
// if the server is currently a zombie, just skip to the next one
+ // TODO: zombieServers key is String not Endpoint.
EndpointWrapper wrapper = zombieServers.get(endpoint);
if (wrapper != null) {
final int numDeadServersToTry = req.getNumDeadServersToTry();
@@ -487,6 +488,7 @@
req.getRequest().setBasePath(baseUrl.toString());
rsp.rsp = getClient(baseUrl).request(req.getRequest(), (String) null);
if (isZombie) {
+ // TODO: zombieServers key is String not Endpoint.
zombieServers.remove(baseUrl);
}
} catch (BaseHttpSolrClient.RemoteExecutionException e) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java
index 7e135ac..690c558 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/util/AsyncListener.java
@@ -21,7 +21,9 @@
* Listener for async requests
*
* @param <T> The result type returned by the {@code onSuccess} method
+ * @deprecated Use the async variants that return CompletableFuture.
*/
+@Deprecated
public interface AsyncListener<T> {
/** Callback method invoked before processing the request */
default void onStart() {}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java
index 5269112..2dff867 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java
@@ -20,7 +20,10 @@
/**
* The return type for solrJ asynchronous requests, providing a mechanism whereby callers may
* request cancellation.
+ *
+ * @deprecated Use the async variants that return CompletableFuture.
*/
+@Deprecated
public interface Cancellable {
/**
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/DebugAsyncListener.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/DebugAsyncListener.java
index b3eaf5f..f6db888 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/DebugAsyncListener.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/DebugAsyncListener.java
@@ -17,17 +17,18 @@
package org.apache.solr.client.solrj.impl;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Semaphore;
+import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.util.AsyncListener;
import org.apache.solr.common.util.NamedList;
import org.junit.Assert;
-public class DebugAsyncListener implements AsyncListener<NamedList<Object>> {
+@Deprecated
+public class DebugAsyncListener
+ implements AsyncListener<NamedList<Object>>, PauseableHttpSolrClient {
- private final CountDownLatch cdl;
-
- private final Semaphore wait = new Semaphore(1);
+ private final CountDownLatch latch;
public volatile boolean onStartCalled;
@@ -37,8 +38,8 @@
public volatile Throwable onFailureResult = null;
- public DebugAsyncListener(CountDownLatch cdl) {
- this.cdl = cdl;
+ public DebugAsyncListener(CountDownLatch latch) {
+ this.latch = latch;
}
@Override
@@ -46,18 +47,6 @@
onStartCalled = true;
}
- public void pause() {
- try {
- wait.acquire();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
-
- public void unPause() {
- wait.release();
- }
-
@Override
public void onSuccess(NamedList<Object> entries) {
pause();
@@ -65,7 +54,7 @@
if (latchCounted) {
Assert.fail("either 'onSuccess' or 'onFailure' should be called exactly once.");
}
- cdl.countDown();
+ latch.countDown();
latchCounted = true;
unPause();
}
@@ -77,8 +66,14 @@
if (latchCounted) {
Assert.fail("either 'onSuccess' or 'onFailure' should be called exactly once.");
}
- cdl.countDown();
+ latch.countDown();
latchCounted = true;
unPause();
}
+
+ @Override
+ public CompletableFuture<NamedList<Object>> requestAsync(
+ SolrRequest<?> solrRequest, String collection) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java
index a8831fd..620c240 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java
@@ -18,6 +18,7 @@
package org.apache.solr.client.solrj.impl;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
@@ -31,6 +32,7 @@
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.MapSolrParams;
+import org.apache.solr.common.util.NamedList;
import org.eclipse.jetty.client.WWWAuthenticationProtocolHandler;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@@ -264,18 +266,81 @@
}
@Test
+ public void testDeprecatedAsyncGet() throws Exception {
+ super.testQueryAsync(true);
+ }
+
+ @Test
public void testAsyncGet() throws Exception {
- super.testQueryAsync();
+ super.testQueryAsync(false);
+ }
+
+ @Test
+ public void testDeprecatedAsyncPost() throws Exception {
+ super.testUpdateAsync(true);
}
@Test
public void testAsyncPost() throws Exception {
- super.testUpdateAsync();
+ super.testUpdateAsync(false);
+ }
+
+ @Test
+ public void testDeprecatedAsyncException() throws Exception {
+ super.testAsyncExceptionBase(true);
}
@Test
public void testAsyncException() throws Exception {
- super.testAsyncExceptionBase();
+ super.testAsyncExceptionBase(false);
+ }
+
+ @Test
+ public void testAsyncAndCancel() throws Exception {
+ String url = getBaseUrl() + DEBUG_SERVLET_PATH;
+ Http2SolrClient.Builder b =
+ new Http2SolrClient.Builder(url)
+ .withConnectionTimeout(DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)
+ .withResponseParser(new XMLResponseParser());
+ try (PausableHttp2SolrClient client = new PausableHttp2SolrClient(url, b)) {
+ super.testAsyncAndCancel(client);
+ }
+ }
+
+ public static class PausableHttp2SolrClient extends Http2SolrClient
+ implements PauseableHttpSolrClient {
+
+ protected PausableHttp2SolrClient(String serverBaseUrl, Builder builder) {
+ super(serverBaseUrl, builder);
+ }
+
+ @Override
+ protected NamedList<Object> processErrorsAndResponse(
+ int httpStatus,
+ String responseReason,
+ String responseMethod,
+ ResponseParser processor,
+ InputStream is,
+ String mimeType,
+ String encoding,
+ boolean isV2Api,
+ String urlExceptionMessage)
+ throws SolrServerException {
+ pause();
+ var nl =
+ super.processErrorsAndResponse(
+ httpStatus,
+ responseReason,
+ responseMethod,
+ processor,
+ is,
+ mimeType,
+ encoding,
+ isV2Api,
+ urlExceptionMessage);
+ unPause();
+ return nl;
+ }
}
@Test
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpJdkSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpJdkSolrClientTest.java
index 81d1188..5cbfaa3 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpJdkSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpJdkSolrClientTest.java
@@ -18,6 +18,7 @@
package org.apache.solr.client.solrj.impl;
import java.io.IOException;
+import java.io.InputStream;
import java.net.CookieHandler;
import java.net.CookieManager;
import java.net.Socket;
@@ -199,18 +200,28 @@
}
@Test
+ public void testDeprecatedAsyncGet() throws Exception {
+ super.testQueryAsync(true);
+ }
+
+ @Test
public void testAsyncGet() throws Exception {
- super.testQueryAsync();
+ super.testQueryAsync(false);
+ }
+
+ @Test
+ public void testDeprecatedAsyncPost() throws Exception {
+ super.testUpdateAsync(true);
}
@Test
public void testAsyncPost() throws Exception {
- super.testUpdateAsync();
+ super.testUpdateAsync(false);
}
@Test
- public void testAsyncException() throws Exception {
- DebugAsyncListener listener = super.testAsyncExceptionBase();
+ public void testDeprecatedAsyncException() throws Exception {
+ DebugAsyncListener listener = super.testAsyncExceptionBase(true);
assertTrue(listener.onFailureResult instanceof CompletionException);
CompletionException ce = (CompletionException) listener.onFailureResult;
assertTrue(ce.getCause() instanceof BaseHttpSolrClient.RemoteSolrException);
@@ -218,7 +229,58 @@
}
@Test
+ public void testAsyncException() throws Exception {
+ super.testAsyncExceptionBase(false);
+ }
+
+ @Test
public void testAsyncAndCancel() throws Exception {
+ String url = getBaseUrl() + DEBUG_SERVLET_PATH;
+ HttpJdkSolrClient.Builder b =
+ new HttpJdkSolrClient.Builder(url).withResponseParser(new XMLResponseParser());
+ try (PausableHttpJdkSolrClient client = new PausableHttpJdkSolrClient(url, b)) {
+ super.testAsyncAndCancel(client);
+ }
+ }
+
+ public static class PausableHttpJdkSolrClient extends HttpJdkSolrClient
+ implements PauseableHttpSolrClient {
+
+ protected PausableHttpJdkSolrClient(String serverBaseUrl, Builder builder) {
+ super(serverBaseUrl, builder);
+ }
+
+ @Override
+ protected NamedList<Object> processErrorsAndResponse(
+ int httpStatus,
+ String responseReason,
+ String responseMethod,
+ ResponseParser processor,
+ InputStream is,
+ String mimeType,
+ String encoding,
+ boolean isV2Api,
+ String urlExceptionMessage)
+ throws SolrServerException {
+ pause();
+ var nl =
+ super.processErrorsAndResponse(
+ httpStatus,
+ responseReason,
+ responseMethod,
+ processor,
+ is,
+ mimeType,
+ encoding,
+ isV2Api,
+ urlExceptionMessage);
+ unPause();
+ return nl;
+ }
+ }
+
+ @Test
+ public void testDeprecatedAsyncAndCancel() throws Exception {
ResponseParser rp = new XMLResponseParser();
DebugServlet.clear();
DebugServlet.addResponseHeader("Content-Type", "application/xml; charset=UTF-8");
@@ -226,8 +288,8 @@
"", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<response />");
String url = getBaseUrl() + DEBUG_SERVLET_PATH;
HttpJdkSolrClient.Builder b = builder(url).withResponseParser(rp);
- CountDownLatch cdl = new CountDownLatch(0);
- DebugAsyncListener listener = new DebugAsyncListener(cdl);
+ CountDownLatch latch = new CountDownLatch(0);
+ DebugAsyncListener listener = new DebugAsyncListener(latch);
Cancellable cancelMe = null;
try (HttpJdkSolrClient client = b.build()) {
QueryRequest query = new QueryRequest(new MapSolrParams(Collections.singletonMap("id", "1")));
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpSolrClientTestBase.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpSolrClientTestBase.java
index 0897e2e..76b5f07 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpSolrClientTestBase.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/HttpSolrClientTestBase.java
@@ -22,13 +22,17 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Locale;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrJettyTestBase;
import org.apache.solr.client.solrj.ResponseParser;
@@ -540,15 +544,18 @@
"No authorization headers expected. Headers: " + DebugServlet.headers, authorizationHeader);
}
- protected void testUpdateAsync() throws Exception {
+ protected void testUpdateAsync(boolean useDeprecatedApi) throws Exception {
ResponseParser rp = new XMLResponseParser();
String url = getBaseUrl();
HttpSolrClientBuilderBase<?, ?> b =
builder(url, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT).withResponseParser(rp);
int limit = 10;
- CountDownLatch cdl = new CountDownLatch(limit);
- DebugAsyncListener[] listeners = new DebugAsyncListener[limit];
- Cancellable[] cancellables = new Cancellable[limit];
+
+ DebugAsyncListener[] listeners = new DebugAsyncListener[limit]; // Deprecated API use
+ Cancellable[] cancellables = new Cancellable[limit]; // Deprecated API use
+
+ CountDownLatch latch = new CountDownLatch(limit);
+
try (HttpSolrClientBase client = b.build()) {
// ensure the collection is empty to start
@@ -562,13 +569,18 @@
assertEquals(0, qr.getResults().getNumFound());
for (int i = 0; i < limit; i++) {
- listeners[i] = new DebugAsyncListener(cdl);
UpdateRequest ur = new UpdateRequest();
ur.add("id", "KEY-" + i);
ur.setMethod(SolrRequest.METHOD.POST);
- client.asyncRequest(ur, COLLECTION_1, listeners[i]);
+
+ if (useDeprecatedApi) {
+ listeners[i] = new DebugAsyncListener(latch);
+ client.asyncRequest(ur, COLLECTION_1, listeners[i]);
+ } else {
+ client.requestAsync(ur, COLLECTION_1).whenComplete((nl, e) -> latch.countDown());
+ }
}
- cdl.await(1, TimeUnit.MINUTES);
+ latch.await(1, TimeUnit.MINUTES);
client.commit(COLLECTION_1);
// check that the correct number of documents were added
@@ -585,7 +597,7 @@
}
}
- protected void testQueryAsync() throws Exception {
+ protected void testQueryAsync(boolean useDeprecatedApi) throws Exception {
ResponseParser rp = new XMLResponseParser();
DebugServlet.clear();
DebugServlet.addResponseHeader("Content-Type", "application/xml; charset=UTF-8");
@@ -593,9 +605,13 @@
HttpSolrClientBuilderBase<?, ?> b =
builder(url, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT).withResponseParser(rp);
int limit = 10;
- CountDownLatch cdl = new CountDownLatch(limit);
- DebugAsyncListener[] listeners = new DebugAsyncListener[limit];
- Cancellable[] cancellables = new Cancellable[limit];
+
+ CountDownLatch latch = new CountDownLatch(limit); // Deprecated API use
+ DebugAsyncListener[] listeners = new DebugAsyncListener[limit]; // Deprecated API use
+ Cancellable[] cancellables = new Cancellable[limit]; // Deprecated API use
+
+ List<CompletableFuture<NamedList<Object>>> futures = new ArrayList<>();
+
try (HttpSolrClientBase client = b.build()) {
for (int i = 0; i < limit; i++) {
DebugServlet.responseBodyByQueryFragment.put(
@@ -606,45 +622,107 @@
QueryRequest query =
new QueryRequest(new MapSolrParams(Collections.singletonMap("id", "KEY-" + i)));
query.setMethod(SolrRequest.METHOD.GET);
- listeners[i] = new DebugAsyncListener(cdl);
- client.asyncRequest(query, null, listeners[i]);
+ if (useDeprecatedApi) {
+ listeners[i] = new DebugAsyncListener(latch);
+ client.asyncRequest(query, null, listeners[i]);
+ } else {
+ futures.add(client.requestAsync(query));
+ }
}
- cdl.await(1, TimeUnit.MINUTES);
- }
+ if (useDeprecatedApi) {
+ latch.await(1, TimeUnit.MINUTES);
+ }
- for (int i = 0; i < limit; i++) {
- NamedList<Object> result = listeners[i].onSuccessResult;
- SolrDocumentList sdl = (SolrDocumentList) result.get("response");
- assertEquals(2, sdl.getNumFound());
- assertEquals(1, sdl.getStart());
- assertTrue(sdl.getNumFoundExact());
- assertEquals(1, sdl.size());
- assertEquals(1, sdl.iterator().next().size());
- assertEquals("KEY-" + i, sdl.iterator().next().get("id"));
+ for (int i = 0; i < limit; i++) {
+ NamedList<Object> result;
+ if (useDeprecatedApi) {
+ result = listeners[i].onSuccessResult;
+ } else {
+ result = futures.get(i).get(1, TimeUnit.MINUTES);
+ }
+ SolrDocumentList sdl = (SolrDocumentList) result.get("response");
+ assertEquals(2, sdl.getNumFound());
+ assertEquals(1, sdl.getStart());
+ assertTrue(sdl.getNumFoundExact());
+ assertEquals(1, sdl.size());
+ assertEquals(1, sdl.iterator().next().size());
+ assertEquals("KEY-" + i, sdl.iterator().next().get("id"));
- assertNull(listeners[i].onFailureResult);
- assertTrue(listeners[i].onStartCalled);
+ if (useDeprecatedApi) {
+ assertNull(listeners[i].onFailureResult);
+ assertTrue(listeners[i].onStartCalled);
+ } else {
+ assertFalse(futures.get(i).isCompletedExceptionally());
+ }
+ }
}
}
- protected DebugAsyncListener testAsyncExceptionBase() throws Exception {
+ protected DebugAsyncListener testAsyncExceptionBase(boolean useDeprecatedApi) throws Exception {
ResponseParser rp = new XMLResponseParser();
DebugServlet.clear();
DebugServlet.addResponseHeader("Content-Type", "Wrong Content Type!");
String url = getBaseUrl() + DEBUG_SERVLET_PATH;
HttpSolrClientBuilderBase<?, ?> b =
builder(url, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT).withResponseParser(rp);
- CountDownLatch cdl = new CountDownLatch(1);
- DebugAsyncListener listener = new DebugAsyncListener(cdl);
+
+ CompletableFuture<NamedList<Object>> future = null;
+ ExecutionException ee = null;
+
+ CountDownLatch latch = new CountDownLatch(1); // Deprecated API use
+ DebugAsyncListener listener = new DebugAsyncListener(latch); // Deprecated API use
+
try (HttpSolrClientBase client = b.build()) {
QueryRequest query = new QueryRequest(new MapSolrParams(Collections.singletonMap("id", "1")));
- client.asyncRequest(query, COLLECTION_1, listener);
- cdl.await(1, TimeUnit.MINUTES);
+ if (useDeprecatedApi) {
+ client.asyncRequest(query, COLLECTION_1, listener);
+ } else {
+ future = client.requestAsync(query, COLLECTION_1);
+ }
+ if (useDeprecatedApi) {
+ latch.await(1, TimeUnit.MINUTES);
+ } else {
+ try {
+ future.get(1, TimeUnit.MINUTES);
+ fail("Should have thrown ExecutionException");
+ } catch (ExecutionException ee1) {
+ ee = ee1;
+ }
+ }
}
-
- assertNotNull(listener.onFailureResult);
- assertTrue(listener.onStartCalled);
- assertNull(listener.onSuccessResult);
+ if (useDeprecatedApi) {
+ assertNotNull(listener.onFailureResult);
+ assertTrue(listener.onStartCalled);
+ assertNull(listener.onSuccessResult);
+ } else {
+ assertTrue(future.isCompletedExceptionally());
+ assertTrue(ee.getCause() instanceof BaseHttpSolrClient.RemoteSolrException);
+ assertTrue(ee.getMessage(), ee.getMessage().contains("mime type"));
+ }
return listener;
}
+
+ protected void testAsyncAndCancel(PauseableHttpSolrClient client) throws Exception {
+ DebugServlet.clear();
+ DebugServlet.addResponseHeader("Content-Type", "application/xml; charset=UTF-8");
+ DebugServlet.responseBodyByQueryFragment.put(
+ "", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<response />");
+
+ QueryRequest query = new QueryRequest(new MapSolrParams(Collections.singletonMap("id", "1")));
+
+ // We are using a version of the class under test that will wait for us before processing the
+ // response.
+ // This way we can ensure our test will always cancel the request before it finishes.
+ client.pause();
+
+ // Make the request then immediately cancel it!
+ CompletableFuture<NamedList<Object>> future = client.requestAsync(query, "collection1");
+ future.cancel(true);
+
+ // We are safe to unpause our client, having guaranteed that our cancel was before everything
+ // completed.
+ client.unPause();
+
+ assertTrue(future.isCancelled());
+ }
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBHttp2SolrClientTest.java
index ebf3a5d..8e29050 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBHttp2SolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBHttp2SolrClientTest.java
@@ -16,9 +16,25 @@
*/
package org.apache.solr.client.solrj.impl;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.solr.SolrTestCase;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.util.AsyncListener;
+import org.apache.solr.client.solrj.util.Cancellable;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.MapSolrParams;
+import org.apache.solr.common.util.NamedList;
import org.junit.Test;
/** Test the LBHttp2SolrClient. */
@@ -51,4 +67,247 @@
http2SolrClient.getUrlParamNames().toArray());
}
}
+
+ @Test
+ public void testAsyncDeprecated() {
+ testAsync(true);
+ }
+
+ @Test
+ public void testAsync() {
+ testAsync(false);
+ }
+
+ @Test
+ public void testAsyncWithFailures() {
+
+ // This demonstrates that the failing endpoint always gets retried, and it is up to the user
+ // to remove any failing nodes if desired.
+
+ LBSolrClient.Endpoint ep1 = new LBSolrClient.Endpoint("http://endpoint.one");
+ LBSolrClient.Endpoint ep2 = new LBSolrClient.Endpoint("http://endpoint.two");
+ List<LBSolrClient.Endpoint> endpointList = List.of(ep1, ep2);
+
+ Http2SolrClient.Builder b =
+ new Http2SolrClient.Builder("http://base.url").withConnectionTimeout(10, TimeUnit.SECONDS);
+ ;
+ try (MockHttp2SolrClient client = new MockHttp2SolrClient("http://base.url", b);
+ LBHttp2SolrClient testClient = new LBHttp2SolrClient.Builder(client, ep1, ep2).build()) {
+
+ for (int j = 0; j < 2; j++) {
+ // j: first time Endpoint One will retrun error code 500.
+ // second time Endpoint One will be healthy
+
+ String basePathToSucceed;
+ if (j == 0) {
+ client.basePathToFail = ep1.getBaseUrl();
+ basePathToSucceed = ep2.getBaseUrl();
+ } else {
+ client.basePathToFail = ep2.getBaseUrl();
+ basePathToSucceed = ep1.getBaseUrl();
+ }
+
+ for (int i = 0; i < 10; i++) {
+ // i: we'll try 10 times to see if it behaves the same every time.
+
+ QueryRequest queryRequest = new QueryRequest(new MapSolrParams(Map.of("q", "" + i)));
+ LBSolrClient.Req req = new LBSolrClient.Req(queryRequest, endpointList);
+ String iterMessage = "iter j/i " + j + "/" + i;
+ try {
+ testClient.requestAsync(req).get(1, TimeUnit.MINUTES);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ fail("interrupted");
+ } catch (TimeoutException | ExecutionException e) {
+ fail(iterMessage + " Response ended in failure: " + e);
+ }
+ if (j == 0) {
+ // The first endpoint gives an exception, so it retries.
+ assertEquals(iterMessage, 2, client.lastBasePaths.size());
+
+ String failedBasePath = client.lastBasePaths.remove(0);
+ assertEquals(iterMessage, client.basePathToFail, failedBasePath);
+ } else {
+ // The first endpoint does not give the exception, it doesn't retry.
+ assertEquals(iterMessage, 1, client.lastBasePaths.size());
+ }
+ String successBasePath = client.lastBasePaths.remove(0);
+ assertEquals(iterMessage, basePathToSucceed, successBasePath);
+ }
+ }
+ }
+ }
+
+ private void testAsync(boolean useDeprecatedApi) {
+ LBSolrClient.Endpoint ep1 = new LBSolrClient.Endpoint("http://endpoint.one");
+ LBSolrClient.Endpoint ep2 = new LBSolrClient.Endpoint("http://endpoint.two");
+ List<LBSolrClient.Endpoint> endpointList = List.of(ep1, ep2);
+
+ Http2SolrClient.Builder b =
+ new Http2SolrClient.Builder("http://base.url").withConnectionTimeout(10, TimeUnit.SECONDS);
+ try (MockHttp2SolrClient client = new MockHttp2SolrClient("http://base.url", b);
+ LBHttp2SolrClient testClient = new LBHttp2SolrClient.Builder(client, ep1, ep2).build()) {
+
+ int limit = 10; // For simplicity use an even limit
+
+ CountDownLatch latch = new CountDownLatch(limit); // deprecated API use
+ List<LBTestAsyncListener> listeners = new ArrayList<>(); // deprecated API use
+ List<CompletableFuture<LBSolrClient.Rsp>> responses = new ArrayList<>();
+
+ for (int i = 0; i < limit; i++) {
+ QueryRequest queryRequest = new QueryRequest(new MapSolrParams(Map.of("q", "" + i)));
+ LBSolrClient.Req req = new LBSolrClient.Req(queryRequest, endpointList);
+ if (useDeprecatedApi) {
+ LBTestAsyncListener listener = new LBTestAsyncListener(latch);
+ listeners.add(listener);
+ testClient.asyncReq(req, listener);
+ } else {
+ responses.add(testClient.requestAsync(req));
+ }
+ }
+
+ if (useDeprecatedApi) {
+ try {
+ // This is just a formality. This is a single-threaded test.
+ latch.await(1, TimeUnit.MINUTES);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ fail("interrupted");
+ }
+ }
+
+ QueryRequest[] queryRequests = new QueryRequest[limit];
+ int numEndpointOne = 0;
+ int numEndpointTwo = 0;
+ for (int i = 0; i < limit; i++) {
+ SolrRequest<?> lastSolrReq = client.lastSolrRequests.get(i);
+ assertTrue(lastSolrReq instanceof QueryRequest);
+ QueryRequest lastQueryReq = (QueryRequest) lastSolrReq;
+ int index = Integer.parseInt(lastQueryReq.getParams().get("q"));
+ assertNull("Found same request twice: " + index, queryRequests[index]);
+ queryRequests[index] = lastQueryReq;
+ if (lastQueryReq.getBasePath().equals(ep1.toString())) {
+ numEndpointOne++;
+ } else if (lastQueryReq.getBasePath().equals(ep2.toString())) {
+ numEndpointTwo++;
+ }
+ NamedList<Object> lastResponse;
+ if (useDeprecatedApi) {
+ LBTestAsyncListener lastAsyncListener = listeners.get(index);
+ assertTrue(lastAsyncListener.onStartCalled);
+ assertNull(lastAsyncListener.failure);
+ assertNotNull(lastAsyncListener.success);
+ lastResponse = lastAsyncListener.success.getResponse();
+ } else {
+ LBSolrClient.Rsp lastRsp = null;
+ try {
+ lastRsp = responses.get(index).get();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ fail("interrupted");
+ } catch (ExecutionException ee) {
+ fail("Response " + index + " ended in failure: " + ee);
+ }
+ lastResponse = lastRsp.getResponse();
+ }
+
+ // The Mock will return {"response": index}.
+ assertEquals("" + index, lastResponse.get("response"));
+ }
+
+ // It is the user's responsibility to shuffle the endpoints when using
+ // async. LB Http Solr Client always will try the passed-in endpoints
+ // in order. In this case, endpoint 1 gets all the requests!
+ assertEquals(limit, numEndpointOne);
+ assertEquals(0, numEndpointTwo);
+
+ assertEquals(limit, client.lastSolrRequests.size());
+ assertEquals(limit, client.lastCollections.size());
+ }
+ }
+
+ @Deprecated(forRemoval = true)
+ public static class LBTestAsyncListener implements AsyncListener<LBSolrClient.Rsp> {
+ private final CountDownLatch latch;
+ private volatile boolean countDownCalled = false;
+ public boolean onStartCalled = false;
+ public LBSolrClient.Rsp success = null;
+ public Throwable failure = null;
+
+ public LBTestAsyncListener(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void onStart() {
+ onStartCalled = true;
+ }
+
+ @Override
+ public void onSuccess(LBSolrClient.Rsp entries) {
+ success = entries;
+ countdown();
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ failure = throwable;
+ countdown();
+ }
+
+ private void countdown() {
+ if (countDownCalled) {
+ throw new IllegalStateException("Already counted down.");
+ }
+ latch.countDown();
+ countDownCalled = true;
+ }
+ }
+
+ public static class MockHttp2SolrClient extends Http2SolrClient {
+
+ public List<SolrRequest<?>> lastSolrRequests = new ArrayList<>();
+
+ public List<String> lastBasePaths = new ArrayList<>();
+
+ public List<String> lastCollections = new ArrayList<>();
+
+ public String basePathToFail = null;
+
+ protected MockHttp2SolrClient(String serverBaseUrl, Builder builder) {
+ // TODO: Consider creating an interface for Http*SolrClient
+ // so mocks can Implement, not Extend, and not actually need to
+ // build an (unused) client
+ super(serverBaseUrl, builder);
+ }
+
+ @Override
+ public Cancellable asyncRequest(
+ SolrRequest<?> solrRequest,
+ String collection,
+ AsyncListener<NamedList<Object>> asyncListener) {
+ throw new UnsupportedOperationException("do not use deprecated method.");
+ }
+
+ @Override
+ public CompletableFuture<NamedList<Object>> requestAsync(
+ final SolrRequest<?> solrRequest, String collection) {
+ CompletableFuture<NamedList<Object>> cf = new CompletableFuture<>();
+ lastSolrRequests.add(solrRequest);
+ lastBasePaths.add(solrRequest.getBasePath());
+ lastCollections.add(collection);
+ if (solrRequest.getBasePath().equals(basePathToFail)) {
+ cf.completeExceptionally(
+ new SolrException(SolrException.ErrorCode.SERVER_ERROR, "We should retry this."));
+ } else {
+ cf.complete(generateResponse(solrRequest));
+ }
+ return cf;
+ }
+
+ private NamedList<Object> generateResponse(SolrRequest<?> solrRequest) {
+ String id = solrRequest.getParams().get("q");
+ return new NamedList<>(Collections.singletonMap("response", id));
+ }
+ }
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/PauseableHttpSolrClient.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/PauseableHttpSolrClient.java
new file mode 100644
index 0000000..09a4794
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/PauseableHttpSolrClient.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.common.util.NamedList;
+
+public interface PauseableHttpSolrClient {
+
+ Semaphore wait = new Semaphore(1);
+
+ public default void pause() {
+ try {
+ wait.acquire();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public default void unPause() {
+ wait.release();
+ }
+
+ public CompletableFuture<NamedList<Object>> requestAsync(
+ final SolrRequest<?> solrRequest, String collection);
+}