SOLR-16505: Switch Recovery/Replication to Jetty HTTP2 (#2276)
UpdateShardHandler
* switch "recoveryOnlyHttpClient" to Http2SolrClient
RecoveryStrategy:
* Use Http2SolrClient
* Simplify cancelation of prep recovery command
IndexFetcher:
* Use Http2SolrClient
* Ensure the entire stream is consumed to avoid a connection reset
Http2SolrClient:
* make HttpListenerFactory configurable (used for internal purposes)
---------
Co-authored-by: iamsanjay <sanjaydutt.india@yahoo.com>
Co-authored-by: David Smiley <dsmiley@apache.org>
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 751e5a9..a507c10 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -130,6 +130,9 @@
---------------------
* SOLR-17248: Refactor ZK related SolrCli tools to separate SolrZkClient and CloudSolrClient instantiation/usage (Lamine Idjeraoui via Eric Pugh)
+* SOLR-16505: Use Jetty HTTP2 for index replication and other "recovery" operations
+ (Sanjay Dutt, David Smiley)
+
================== 9.6.0 ==================
New Features
---------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 9ea8373..d945375 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -22,17 +22,17 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
-import org.apache.http.client.methods.HttpUriRequest;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -124,7 +124,7 @@
private int retries;
private boolean recoveringAfterStartup;
private CoreContainer cc;
- private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
+ private volatile FutureTask<NamedList<Object>> prevSendPreRecoveryHttpUriRequest;
private final Replica.Type replicaType;
private CoreDescriptor coreDescriptor;
@@ -175,25 +175,18 @@
this.recoveringAfterStartup = recoveringAfterStartup;
}
- /** Builds a new HttpSolrClient for use in recovery. Caller must close */
- private HttpSolrClient.Builder recoverySolrClientBuilder(String baseUrl, String leaderCoreName) {
- // workaround for SOLR-13605: get the configured timeouts & set them directly
- // (even though getRecoveryOnlyHttpClient() already has them set)
+ private Http2SolrClient.Builder recoverySolrClientBuilder(String baseUrl, String leaderCoreName) {
final UpdateShardHandlerConfig cfg = cc.getConfig().getUpdateShardHandlerConfig();
- return (new HttpSolrClient.Builder(baseUrl)
+ return new Http2SolrClient.Builder(baseUrl)
.withDefaultCollection(leaderCoreName)
- .withConnectionTimeout(cfg.getDistributedConnectionTimeout(), TimeUnit.MILLISECONDS)
- .withSocketTimeout(cfg.getDistributedSocketTimeout(), TimeUnit.MILLISECONDS)
- .withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient()));
+ .withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient());
}
// make sure any threads stop retrying
@Override
public final void close() {
close = true;
- if (prevSendPreRecoveryHttpUriRequest != null) {
- prevSendPreRecoveryHttpUriRequest.abort();
- }
+ cancelPrepRecoveryCmd();
log.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
}
@@ -634,11 +627,7 @@
.getCollection(cloudDesc.getCollectionName())
.getSlice(cloudDesc.getShardId());
- try {
- prevSendPreRecoveryHttpUriRequest.abort();
- } catch (NullPointerException e) {
- // okay
- }
+ cancelPrepRecoveryCmd();
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
@@ -894,7 +883,6 @@
private final void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
throws SolrServerException, IOException, InterruptedException, ExecutionException {
-
WaitForState prepCmd = new WaitForState();
prepCmd.setCoreName(leaderCoreName);
prepCmd.setNodeName(zkController.getNodeName());
@@ -915,18 +903,19 @@
int readTimeout =
conflictWaitMs
+ Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "8000"));
- try (HttpSolrClient client =
+ try (SolrClient client =
recoverySolrClientBuilder(
leaderBaseUrl,
null) // leader core omitted since client only used for 'admin' request
- .withSocketTimeout(readTimeout, TimeUnit.MILLISECONDS)
+ .withIdleTimeout(readTimeout, TimeUnit.MILLISECONDS)
.build()) {
- HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
- prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
-
+ prevSendPreRecoveryHttpUriRequest = new FutureTask<>(() -> client.request(prepCmd));
log.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd);
-
- mrr.future.get();
+ prevSendPreRecoveryHttpUriRequest.run();
}
}
+
+ private void cancelPrepRecoveryCmd() {
+ Optional.ofNullable(prevSendPreRecoveryHttpUriRequest).ifPresent(req -> req.cancel(true));
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index b347565..69a2ddd 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -84,7 +84,6 @@
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import java.util.zip.InflaterInputStream;
-import org.apache.http.client.HttpClient;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
@@ -97,9 +96,9 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
+import org.apache.solr.client.solrj.impl.InputStreamResponseParser;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
@@ -128,12 +127,12 @@
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.security.AllowListUrlChecker;
import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.IndexOutputOutputStream;
import org.apache.solr.util.RTimer;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.TestInjection;
-import org.apache.solr.util.stats.InstrumentedHttpRequestExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -186,7 +185,7 @@
boolean fetchFromLeader = false;
- private final HttpClient myHttpClient;
+ private final SolrClient solrClient;
private Integer connTimeout;
@@ -261,22 +260,22 @@
}
}
- private static HttpClient createHttpClient(
- SolrCore core,
- String httpBasicAuthUser,
- String httpBasicAuthPassword,
- boolean useCompression) {
- final ModifiableSolrParams httpClientParams = new ModifiableSolrParams();
- httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_USER, httpBasicAuthUser);
- httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
- httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
- // no metrics, just tracing
- InstrumentedHttpRequestExecutor executor = new InstrumentedHttpRequestExecutor(null);
- return HttpClientUtil.createClient(
- httpClientParams,
- core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyConnectionManager(),
- true,
- executor);
+ // It's crucial not to remove the authentication credentials as they are essential for User
+ // managed replication.
+ // GitHub PR #2276
+ private SolrClient createSolrClient(
+ SolrCore core, String httpBasicAuthUser, String httpBasicAuthPassword, String leaderBaseUrl) {
+ final UpdateShardHandler updateShardHandler = core.getCoreContainer().getUpdateShardHandler();
+ Http2SolrClient httpClient =
+ new Http2SolrClient.Builder(leaderBaseUrl)
+ .withHttpClient(updateShardHandler.getRecoveryOnlyHttpClient())
+ .withListenerFactory(
+ updateShardHandler.getRecoveryOnlyHttpClient().getListenerFactory())
+ .withBasicAuthCredentials(httpBasicAuthUser, httpBasicAuthPassword)
+ .withIdleTimeout(soTimeout, TimeUnit.MILLISECONDS)
+ .withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
+ .build();
+ return httpClient;
}
public IndexFetcher(
@@ -318,12 +317,10 @@
if (soTimeout == -1) {
soTimeout = getParameter(initArgs, HttpClientUtil.PROP_SO_TIMEOUT, 120000, null);
}
-
String httpBasicAuthUser = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
String httpBasicAuthPassword = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
- myHttpClient =
- createHttpClient(
- solrCore, httpBasicAuthUser, httpBasicAuthPassword, useExternalCompression);
+ solrClient =
+ createSolrClient(solrCore, httpBasicAuthUser, httpBasicAuthPassword, leaderBaseUrl);
}
private void setLeaderCoreUrl(String leaderCoreUrl) {
@@ -381,16 +378,10 @@
params.set(CommonParams.WT, JAVABIN);
params.set(CommonParams.QT, ReplicationHandler.PATH);
QueryRequest req = new QueryRequest(params);
-
+ req.setBasePath(leaderBaseUrl);
// TODO modify to use shardhandler
- try (SolrClient client =
- new Builder(leaderBaseUrl)
- .withHttpClient(myHttpClient)
- .withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
- .withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS)
- .build()) {
-
- return client.request(req, leaderCoreName);
+ try {
+ return solrClient.request(req, leaderCoreName);
} catch (SolrServerException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e.getMessage(), e);
}
@@ -408,15 +399,10 @@
params.set(CommonParams.WT, JAVABIN);
params.set(CommonParams.QT, ReplicationHandler.PATH);
QueryRequest req = new QueryRequest(params);
-
+ req.setBasePath(leaderBaseUrl);
// TODO modify to use shardhandler
- try (SolrClient client =
- new HttpSolrClient.Builder(leaderBaseUrl)
- .withHttpClient(myHttpClient)
- .withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
- .withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS)
- .build()) {
- NamedList<?> response = client.request(req, leaderCoreName);
+ try {
+ NamedList<?> response = solrClient.request(req, leaderCoreName);
List<Map<String, Object>> files = (List<Map<String, Object>>) response.get(CMD_GET_FILE_LIST);
if (files != null) filesToDownload = Collections.synchronizedList(files);
@@ -1805,10 +1791,10 @@
private void fetch() throws Exception {
try {
while (true) {
- int result;
- try (FastInputStream is = getStream()) {
+ try (FastInputStream fis = getStream()) {
+ int result;
// fetch packets one by one in a single request
- result = fetchPackets(is);
+ result = fetchPackets(fis);
if (result == 0 || result == NO_CONTENT) {
return;
}
@@ -1834,18 +1820,25 @@
byte[] longbytes = new byte[8];
try {
while (true) {
+ if (fis.peek() == -1) {
+ if (bytesDownloaded == 0) {
+ log.warn("No content received for file: {}", fileName);
+ return NO_CONTENT;
+ }
+ return 0;
+ }
if (stop) {
stop = false;
aborted = true;
throw new ReplicationHandlerException("User aborted replication");
}
long checkSumServer = -1;
+
fis.readFully(intbytes);
// read the size of the packet
int packetSize = readInt(intbytes);
if (packetSize <= 0) {
- log.warn("No content received for file: {}", fileName);
- return NO_CONTENT;
+ continue;
}
// TODO consider recoding the remaining logic to not use/need buf[]; instead use the
// internal buffer of fis
@@ -1879,7 +1872,6 @@
log.debug("Fetched and wrote {} bytes of file: {}", bytesDownloaded, fileName);
// errorCount is always set to zero after a successful packet
errorCount = 0;
- if (bytesDownloaded >= size) return 0;
}
} catch (ReplicationHandlerException e) {
throw e;
@@ -1968,7 +1960,7 @@
private FastInputStream getStream() throws IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
- // //the method is command=filecontent
+ // the method is command=filecontent
params.set(COMMAND, CMD_GET_FILE);
params.set(GENERATION, Long.toString(indexGen));
params.set(CommonParams.QT, ReplicationHandler.PATH);
@@ -1991,17 +1983,13 @@
NamedList<?> response;
InputStream is = null;
-
// TODO use shardhandler
- try (SolrClient client =
- new Builder(leaderBaseUrl)
- .withHttpClient(myHttpClient)
- .withResponseParser(null)
- .withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
- .withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS)
- .build()) {
+ try {
QueryRequest req = new QueryRequest(params);
- response = client.request(req, leaderCoreName);
+ req.setResponseParser(new InputStreamResponseParser(FILE_STREAM));
+ req.setBasePath(leaderBaseUrl);
+ if (useExternalCompression) req.addHeader("Accept-Encoding", "gzip");
+ response = solrClient.request(req, leaderCoreName);
is = (InputStream) response.get("stream");
if (useInternalCompression) {
is = new InflaterInputStream(is);
@@ -2125,21 +2113,15 @@
params.set("follower", false);
params.set(CommonParams.QT, ReplicationHandler.PATH);
+ QueryRequest request = new QueryRequest(params);
+ request.setBasePath(leaderBaseUrl);
// TODO use shardhandler
- try (SolrClient client =
- new HttpSolrClient.Builder(leaderBaseUrl)
- .withHttpClient(myHttpClient)
- .withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
- .withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS)
- .build()) {
- QueryRequest request = new QueryRequest(params);
- return client.request(request, leaderCoreName);
- }
+ return solrClient.request(request, leaderCoreName);
}
public void destroy() {
abortFetch();
- HttpClientUtil.close(myHttpClient);
+ IOUtils.closeQuietly(solrClient);
}
String getLeaderCoreUrl() {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index b3ab8cb..650a61d 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -75,17 +75,15 @@
private final Http2SolrClient updateOnlyClient;
- private final CloseableHttpClient recoveryOnlyClient;
+ private final Http2SolrClient recoveryOnlyClient;
private final CloseableHttpClient defaultClient;
- private final InstrumentedPoolingHttpClientConnectionManager recoveryOnlyConnectionManager;
-
private final InstrumentedPoolingHttpClientConnectionManager defaultConnectionManager;
private final InstrumentedHttpRequestExecutor httpRequestExecutor;
- private final InstrumentedHttpListenerFactory updateHttpListenerFactory;
+ private final InstrumentedHttpListenerFactory trackHttpSolrMetrics;
private SolrMetricsContext solrMetricsContext;
@@ -93,16 +91,11 @@
private int connectionTimeout = HttpClientUtil.DEFAULT_CONNECT_TIMEOUT;
public UpdateShardHandler(UpdateShardHandlerConfig cfg) {
- recoveryOnlyConnectionManager =
- new InstrumentedPoolingHttpClientConnectionManager(
- HttpClientUtil.getSocketFactoryRegistryProvider().getSocketFactoryRegistry());
defaultConnectionManager =
new InstrumentedPoolingHttpClientConnectionManager(
HttpClientUtil.getSocketFactoryRegistryProvider().getSocketFactoryRegistry());
ModifiableSolrParams clientParams = new ModifiableSolrParams();
if (cfg != null) {
- recoveryOnlyConnectionManager.setMaxTotal(cfg.getMaxUpdateConnections());
- recoveryOnlyConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnectionsPerHost());
defaultConnectionManager.setMaxTotal(cfg.getMaxUpdateConnections());
defaultConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnectionsPerHost());
clientParams.set(HttpClientUtil.PROP_SO_TIMEOUT, cfg.getDistributedSocketTimeout());
@@ -120,10 +113,8 @@
log.debug("Created default UpdateShardHandler HTTP client with params: {}", clientParams);
httpRequestExecutor = new InstrumentedHttpRequestExecutor(getMetricNameStrategy(cfg));
- updateHttpListenerFactory = new InstrumentedHttpListenerFactory(getNameStrategy(cfg));
- recoveryOnlyClient =
- HttpClientUtil.createClient(
- clientParams, recoveryOnlyConnectionManager, false, httpRequestExecutor);
+ trackHttpSolrMetrics = new InstrumentedHttpListenerFactory(getNameStrategy(cfg));
+
defaultClient =
HttpClientUtil.createClient(
clientParams, defaultConnectionManager, false, httpRequestExecutor);
@@ -133,15 +124,24 @@
DistributedUpdateProcessor.DISTRIB_FROM,
DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
Http2SolrClient.Builder updateOnlyClientBuilder = new Http2SolrClient.Builder();
+ Http2SolrClient.Builder recoveryOnlyClientBuilder = new Http2SolrClient.Builder();
if (cfg != null) {
updateOnlyClientBuilder
.withConnectionTimeout(cfg.getDistributedConnectionTimeout(), TimeUnit.MILLISECONDS)
.withIdleTimeout(cfg.getDistributedSocketTimeout(), TimeUnit.MILLISECONDS)
.withMaxConnectionsPerHost(cfg.getMaxUpdateConnectionsPerHost());
+ recoveryOnlyClientBuilder
+ .withConnectionTimeout(cfg.getDistributedConnectionTimeout(), TimeUnit.MILLISECONDS)
+ .withIdleTimeout(cfg.getDistributedSocketTimeout(), TimeUnit.MILLISECONDS)
+ .withMaxConnectionsPerHost(cfg.getMaxUpdateConnectionsPerHost());
}
+
updateOnlyClientBuilder.withTheseParamNamesInTheUrl(urlParamNames);
updateOnlyClient = updateOnlyClientBuilder.build();
- updateOnlyClient.addListenerFactory(updateHttpListenerFactory);
+ updateOnlyClient.addListenerFactory(trackHttpSolrMetrics);
+
+ recoveryOnlyClient = recoveryOnlyClientBuilder.build();
+ recoveryOnlyClient.addListenerFactory(trackHttpSolrMetrics);
ThreadFactory recoveryThreadFactory = new SolrNamedThreadFactory("recoveryExecutor");
if (cfg != null && cfg.getMaxRecoveryThreads() > 0) {
@@ -205,7 +205,7 @@
public void initializeMetrics(SolrMetricsContext parentContext, String scope) {
solrMetricsContext = parentContext.getChildContext(this);
String expandedScope = SolrMetricManager.mkName(scope, getCategory().name());
- updateHttpListenerFactory.initializeMetrics(solrMetricsContext, expandedScope);
+ trackHttpSolrMetrics.initializeMetrics(solrMetricsContext, expandedScope);
defaultConnectionManager.initializeMetrics(solrMetricsContext, expandedScope);
updateExecutor =
MetricUtils.instrumentedExecutorService(
@@ -247,7 +247,7 @@
}
// don't introduce a bug, this client is for recovery ops only!
- public HttpClient getRecoveryOnlyHttpClient() {
+ public Http2SolrClient getRecoveryOnlyHttpClient() {
return recoveryOnlyClient;
}
@@ -264,10 +264,6 @@
return defaultConnectionManager;
}
- public PoolingHttpClientConnectionManager getRecoveryOnlyConnectionManager() {
- return recoveryOnlyConnectionManager;
- }
-
/**
* @return executor for recovery operations
*/
@@ -290,10 +286,9 @@
// do nothing
}
IOUtils.closeQuietly(updateOnlyClient);
- HttpClientUtil.close(recoveryOnlyClient);
+ IOUtils.closeQuietly(recoveryOnlyClient);
HttpClientUtil.close(defaultClient);
defaultConnectionManager.close();
- recoveryOnlyConnectionManager.close();
}
}
@@ -309,5 +304,6 @@
public void setSecurityBuilder(HttpClientBuilderPlugin builder) {
builder.setup(updateOnlyClient);
+ builder.setup(recoveryOnlyClient);
}
}
diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-follower-auth.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-follower-auth.xml
new file mode 100644
index 0000000..1635cfb0
--- /dev/null
+++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-follower-auth.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<config>
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+ <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
+ <schemaFactory class="ClassicIndexSchemaFactory"/>
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ </updateHandler>
+
+ <requestHandler name="/select" class="solr.SearchHandler">
+ <bool name="httpCaching">true</bool>
+ </requestHandler>
+
+ <!-- test query parameter defaults -->
+ <requestHandler name="/defaults" class="solr.SearchHandler">
+
+ </requestHandler>
+
+ <!-- test query parameter defaults -->
+ <requestHandler name="/lazy" class="solr.SearchHandler" startup="lazy">
+ </requestHandler>
+
+ <requestHandler name="/replication" class="solr.ReplicationHandler">
+ <lst name="follower">
+ <str name="leaderUrl">http://127.0.0.1:TEST_PORT/solr/collection1</str>
+ <str name="pollInterval">00:00:01</str>
+ <str name="compression">COMPRESSION</str>
+ <str name="httpBasicAuthUser">solr</str>
+ <str name="httpBasicAuthPassword">SolrRocks</str>
+ </lst>
+ </requestHandler>
+
+ <requestDispatcher>
+ <requestParsers multipartUploadLimitInKB="-1"/>
+ <httpCaching lastModifiedFrom="openTime" etagSeed="Solr" never304="false">
+ <cacheControl>max-age=30, public</cacheControl>
+ </httpCaching>
+ </requestDispatcher>
+
+</config>
diff --git a/solr/core/src/test/org/apache/solr/cloud/RecoveryStrategyStressTest.java b/solr/core/src/test/org/apache/solr/cloud/RecoveryStrategyStressTest.java
new file mode 100644
index 0000000..06c0397
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/RecoveryStrategyStressTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import com.carrotsearch.randomizedtesting.annotations.Nightly;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudLegacySolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.embedded.JettySolrRunner;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+@Nightly
+public class RecoveryStrategyStressTest extends SolrCloudTestCase {
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ cluster = configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure();
+ }
+
+ @Test
+ public void stressTestRecovery() throws Exception {
+ final String collection = "recoveryStressTest";
+ CollectionAdminRequest.createCollection(collection, "conf", 1, 4)
+ .process(cluster.getSolrClient());
+ waitForState(
+ "Expected a collection with one shard and two replicas", collection, clusterShape(1, 4));
+ final var scheduledExecutorService =
+ Executors.newScheduledThreadPool(1, new SolrNamedThreadFactory("stressTestRecovery"));
+ try (SolrClient solrClient =
+ cluster.basicSolrClientBuilder().withDefaultCollection(collection).build()) {
+ final StoppableIndexingThread indexThread =
+ new StoppableIndexingThread(null, solrClient, "1", true, 10, 1, true);
+
+ final var startAndStopCount = new CountDownLatch(50);
+ final Thread startAndStopRandomReplicas =
+ new Thread(
+ () -> {
+ try {
+ while (startAndStopCount.getCount() > 0) {
+ DocCollection state = getCollectionState(collection);
+ Replica leader = state.getLeader("shard1");
+ Replica replica =
+ getRandomReplica(state.getSlice("shard1"), (r) -> !leader.equals(r));
+
+ JettySolrRunner jetty = cluster.getReplicaJetty(replica);
+ jetty.stop();
+ Thread.sleep(100);
+ jetty.start();
+ startAndStopCount.countDown();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ startAndStopRandomReplicas.start();
+ // index and commit doc after fixed interval of 10 sec
+ scheduledExecutorService.scheduleWithFixedDelay(
+ indexThread, 1000, 10000, TimeUnit.MILLISECONDS);
+ scheduledExecutorService.scheduleWithFixedDelay(
+ () -> {
+ try {
+ new UpdateRequest().commit(solrClient, collection);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (SolrServerException e) {
+ throw new RuntimeException(e);
+ }
+ },
+ 100,
+ 10000,
+ TimeUnit.MILLISECONDS);
+
+ startAndStopCount.await();
+ scheduledExecutorService.shutdownNow();
+ // final commit to make documents visible for replicas
+ new UpdateRequest().commit(solrClient, collection);
+ }
+ cluster.getZkStateReader().waitForState(collection, 120, TimeUnit.SECONDS, clusterShape(1, 4));
+
+ // test that leader and replica have same doc count
+ DocCollection state = getCollectionState(collection);
+ assertShardConsistency(state.getSlice("shard1"), true);
+ }
+
+ private void assertShardConsistency(Slice shard, boolean expectDocs) throws Exception {
+ List<Replica> replicas = shard.getReplicas(r -> r.getState() == Replica.State.ACTIVE);
+ long[] numCounts = new long[replicas.size()];
+ int i = 0;
+ for (Replica replica : replicas) {
+ try (var client =
+ new HttpSolrClient.Builder(replica.getBaseUrl())
+ .withDefaultCollection(replica.getCoreName())
+ .withHttpClient(((CloudLegacySolrClient) cluster.getSolrClient()).getHttpClient())
+ .build()) {
+ numCounts[i] =
+ client.query(new SolrQuery("*:*").add("distrib", "false")).getResults().getNumFound();
+ i++;
+ }
+ }
+ for (int j = 1; j < replicas.size(); j++) {
+ if (numCounts[j] != numCounts[j - 1])
+ fail("Mismatch in counts between replicas"); // TODO improve this!
+ if (numCounts[j] == 0 && expectDocs)
+ fail("Expected docs on shard " + shard.getName() + " but found none");
+ }
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/handler/ReplicationTestHelper.java b/solr/core/src/test/org/apache/solr/handler/ReplicationTestHelper.java
index 892dc67..a9c8142 100644
--- a/solr/core/src/test/org/apache/solr/handler/ReplicationTestHelper.java
+++ b/solr/core/src/test/org/apache/solr/handler/ReplicationTestHelper.java
@@ -103,7 +103,8 @@
if (null != port) {
line = line.replace("TEST_PORT", port.toString());
}
- line = line.replace("COMPRESSION", internalCompression ? "internal" : "false");
+ String externalCompression = LuceneTestCase.random().nextBoolean() ? "external" : "false";
+ line = line.replace("COMPRESSION", internalCompression ? "internal" : externalCompression);
out.write(line);
}
}
diff --git a/solr/core/src/test/org/apache/solr/handler/TestUserManagedReplicationWithAuth.java b/solr/core/src/test/org/apache/solr/handler/TestUserManagedReplicationWithAuth.java
new file mode 100644
index 0000000..b230aad
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/handler/TestUserManagedReplicationWithAuth.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler;
+
+import static org.apache.solr.common.params.CommonParams.JAVABIN;
+import static org.apache.solr.handler.ReplicationHandler.CMD_DISABLE_POLL;
+import static org.apache.solr.handler.ReplicationHandler.CMD_FETCH_INDEX;
+import static org.apache.solr.handler.ReplicationHandler.COMMAND;
+import static org.apache.solr.handler.ReplicationTestHelper.createAndStartJetty;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.HealthCheckRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.embedded.JettySolrRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressSSL
+public class TestUserManagedReplicationWithAuth extends SolrTestCaseJ4 {
+ JettySolrRunner leaderJetty, followerJetty, followerJettyWithAuth;
+ SolrClient leaderClient, followerClient, followerClientWithAuth;
+ ReplicationTestHelper.SolrInstance leader = null, follower = null, followerWithAuth = null;
+
+ private static String user = "solr";
+ private static String pass = "SolrRocks";
+ private static String securityJson =
+ "{\n"
+ + "\"authentication\":{ \n"
+ + " \"blockUnknown\": true, \n"
+ + " \"class\":\"solr.BasicAuthPlugin\",\n"
+ + " \"credentials\":{\"solr\":\"IV0EHq1OnNrj6gvRCwvFwTrZ1+z1oBbnQdiVC3otuq0= Ndd7LKvVBAaZIF0QAVi1ekCfAJXr1GGfLtRUXhgrF8c=\"}, \n"
+ + " \"realm\":\"My Solr users\", \n"
+ + " \"forwardCredentials\": false \n"
+ + "},\n"
+ + "\"authorization\":{\n"
+ + " \"class\":\"solr.RuleBasedAuthorizationPlugin\",\n"
+ + " \"permissions\":[{\"name\":\"security-edit\",\n"
+ + " \"role\":\"admin\"}],\n"
+ + " \"user-role\":{\"solr\":\"admin\"}\n"
+ + "}}";
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ systemSetPropertySolrDisableUrlAllowList("true");
+ // leader with Basic auth enabled via security.json
+ leader =
+ new ReplicationTestHelper.SolrInstance(
+ createTempDir("solr-instance").toFile(), "leader", null);
+ leader.setUp();
+ // Configuring basic auth for Leader
+ Path solrLeaderHome = Path.of(leader.getHomeDir());
+ Files.write(
+ solrLeaderHome.resolve("security.json"), securityJson.getBytes(StandardCharsets.UTF_8));
+ leaderJetty = ReplicationTestHelper.createAndStartJetty(leader);
+ leaderClient =
+ ReplicationTestHelper.createNewSolrClient(
+ buildUrl(leaderJetty.getLocalPort()), DEFAULT_TEST_CORENAME);
+
+ // follower with no basic auth credentials for leader configured.
+ follower =
+ new ReplicationTestHelper.SolrInstance(
+ createTempDir("solr-instance").toFile(), "follower", leaderJetty.getLocalPort());
+ follower.setUp();
+ followerJetty = createAndStartJetty(follower);
+ followerClient =
+ ReplicationTestHelper.createNewSolrClient(
+ buildUrl(followerJetty.getLocalPort()), DEFAULT_TEST_CORENAME);
+
+ // follower with basic auth credentials for leader configured in solrconfig.xml.
+ followerWithAuth =
+ new ReplicationTestHelper.SolrInstance(
+ createTempDir("solr-instance").toFile(), "follower-auth", leaderJetty.getLocalPort());
+ followerWithAuth.setUp();
+ followerJettyWithAuth = createAndStartJetty(followerWithAuth);
+ followerClientWithAuth =
+ ReplicationTestHelper.createNewSolrClient(
+ buildUrl(followerJettyWithAuth.getLocalPort()), DEFAULT_TEST_CORENAME);
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (null != leaderJetty) {
+ leaderJetty.stop();
+ leaderJetty = null;
+ }
+ if (null != followerJetty) {
+ followerJetty.stop();
+ followerJetty = null;
+ }
+ if (null != followerJettyWithAuth) {
+ followerJettyWithAuth.stop();
+ followerJettyWithAuth = null;
+ }
+ if (null != leaderClient) {
+ leaderClient.close();
+ leaderClient = null;
+ }
+ if (null != followerClient) {
+ followerClient.close();
+ followerClient = null;
+ }
+ if (null != followerClientWithAuth) {
+ followerClientWithAuth.close();
+ followerClientWithAuth = null;
+ }
+ }
+
+ private <T extends SolrRequest<? extends SolrResponse>> T withBasicAuth(T req) {
+ req.setBasicAuthCredentials(user, pass);
+ return req;
+ }
+
+ @Test
+ public void doTestManualFetchIndexWithAuthEnabled() throws Exception {
+ disablePoll(followerJetty, followerClient);
+ int nDocs = 500;
+ int docsAdded = 0;
+
+ UpdateRequest commitReq = new UpdateRequest();
+ withBasicAuth(commitReq);
+ for (int i = 0; docsAdded < nDocs / 2; i++, docsAdded++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ String[] fields = {"id", i + "", "name", "name = " + i};
+ for (int j = 0; j < fields.length; j += 2) {
+ doc.addField(fields[j], fields[j + 1]);
+ }
+ UpdateRequest req = new UpdateRequest();
+ withBasicAuth(req).add(doc);
+ req.process(leaderClient, DEFAULT_TEST_CORENAME);
+ if (i % 10 == 0) {
+ commitReq.commit(leaderClient, DEFAULT_TEST_CORENAME);
+ }
+ }
+ commitReq.commit(leaderClient, DEFAULT_TEST_CORENAME);
+
+ assertEquals(
+ docsAdded,
+ queryWithBasicAuth(leaderClient, new SolrQuery("*:*")).getResults().getNumFound());
+
+ // Without Auth credentials fetchIndex will fail
+ pullIndexFromTo(leaderJetty, followerJetty, false);
+ assertNotEquals(
+ docsAdded,
+ queryWithBasicAuth(followerClient, new SolrQuery("*:*")).getResults().getNumFound());
+
+ // With Auth credentials
+ pullIndexFromTo(leaderJetty, followerJetty, true);
+ assertEquals(
+ docsAdded,
+ queryWithBasicAuth(followerClient, new SolrQuery("*:*")).getResults().getNumFound());
+ }
+
+ @Test
+ public void doTestAutoReplicationWithAuthEnabled() throws Exception {
+ int nDocs = 250;
+ UpdateRequest commitReq = new UpdateRequest();
+ withBasicAuth(commitReq);
+ for (int i = 0; i < nDocs; i++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ String[] fields = {"id", i + "", "name", "name = " + i};
+ for (int j = 0; j < fields.length; j += 2) {
+ doc.addField(fields[j], fields[j + 1]);
+ }
+ UpdateRequest req = new UpdateRequest();
+ withBasicAuth(req).add(doc);
+ req.process(leaderClient, DEFAULT_TEST_CORENAME);
+ if (i % 10 == 0) {
+ commitReq.commit(leaderClient, DEFAULT_TEST_CORENAME);
+ }
+ }
+ commitReq.commit(leaderClient, DEFAULT_TEST_CORENAME);
+ // wait for followers to fetchIndex
+ Thread.sleep(5000);
+ // follower with auth should be healthy
+ HealthCheckRequest healthCheckRequestFollower = new HealthCheckRequest();
+ healthCheckRequestFollower.setMaxGenerationLag(2);
+ assertEquals(
+ CommonParams.OK,
+ healthCheckRequestFollower
+ .process(followerClientWithAuth)
+ .getResponse()
+ .get(CommonParams.STATUS));
+ // follower with auth should be unhealthy
+ healthCheckRequestFollower = new HealthCheckRequest();
+ healthCheckRequestFollower.setMaxGenerationLag(2);
+ assertEquals(
+ CommonParams.FAILURE,
+ healthCheckRequestFollower.process(followerClient).getResponse().get(CommonParams.STATUS));
+ }
+
+ private QueryResponse queryWithBasicAuth(SolrClient client, SolrQuery q)
+ throws IOException, SolrServerException {
+ return withBasicAuth(new QueryRequest(q)).process(client);
+ }
+
+ private void disablePoll(JettySolrRunner Jetty, SolrClient solrClient)
+ throws SolrServerException, IOException {
+ ModifiableSolrParams disablePollParams = new ModifiableSolrParams();
+ disablePollParams.set(COMMAND, CMD_DISABLE_POLL);
+ disablePollParams.set(CommonParams.WT, JAVABIN);
+ disablePollParams.set(CommonParams.QT, ReplicationHandler.PATH);
+ QueryRequest req = new QueryRequest(disablePollParams);
+ withBasicAuth(req);
+ req.setBasePath(buildUrl(Jetty.getLocalPort()));
+
+ solrClient.request(req, DEFAULT_TEST_CORENAME);
+ }
+
+ private void pullIndexFromTo(
+ JettySolrRunner srcSolr, JettySolrRunner destSolr, boolean authEnabled)
+ throws SolrServerException, IOException {
+ String srcUrl = buildUrl(srcSolr.getLocalPort()) + "/" + DEFAULT_TEST_CORENAME;
+ String destUrl = buildUrl(destSolr.getLocalPort()) + "/" + DEFAULT_TEST_CORENAME;
+ QueryRequest req = getQueryRequestForFetchIndex(authEnabled, srcUrl);
+ req.setBasePath(buildUrl(destSolr.getLocalPort()));
+ followerClient.request(req, DEFAULT_TEST_CORENAME);
+ }
+
+ private QueryRequest getQueryRequestForFetchIndex(boolean authEnabled, String srcUrl) {
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.set(COMMAND, CMD_FETCH_INDEX);
+ solrParams.set(CommonParams.WT, JAVABIN);
+ solrParams.set(CommonParams.QT, ReplicationHandler.PATH);
+ solrParams.set("leaderUrl", srcUrl);
+ solrParams.set("wait", "true");
+ if (authEnabled) {
+ solrParams.set("httpBasicAuthUser", user);
+ solrParams.set("httpBasicAuthPassword", pass);
+ }
+ QueryRequest req = new QueryRequest(solrParams);
+ return req;
+ }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 40fd27b..e437e2f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -135,7 +135,9 @@
this.httpClient = createHttpClient(builder);
this.closeClient = true;
}
-
+ if (builder.listenerFactory != null) {
+ this.listenerFactory.addAll(builder.listenerFactory);
+ }
updateDefaultMimeTypeForParser();
this.httpClient.setFollowRedirects(Boolean.TRUE.equals(builder.followRedirects));
@@ -147,6 +149,10 @@
this.listenerFactory.add(factory);
}
+ public List<HttpListenerFactory> getListenerFactory() {
+ return listenerFactory;
+ }
+
// internal usage only
HttpClient getHttpClient() {
return httpClient;
@@ -845,6 +851,13 @@
protected Long keyStoreReloadIntervalSecs;
+ public Http2SolrClient.Builder withListenerFactory(List<HttpListenerFactory> listenerFactory) {
+ this.listenerFactory = listenerFactory;
+ return this;
+ }
+
+ private List<HttpListenerFactory> listenerFactory;
+
public Builder() {
super();
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java
index a8831fd..b664295 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/Http2SolrClientTest.java
@@ -587,5 +587,27 @@
}
}
+ @Test
+ public void testIdleTimeoutWithHttpClient() {
+ try (Http2SolrClient oldClient =
+ new Http2SolrClient.Builder("baseSolrUrl")
+ .withIdleTimeout(5000, TimeUnit.MILLISECONDS)
+ .build()) {
+ try (Http2SolrClient onlyBaseUrlChangedClient =
+ new Http2SolrClient.Builder("newBaseSolrUrl").withHttpClient(oldClient).build()) {
+ assertEquals(oldClient.getIdleTimeout(), onlyBaseUrlChangedClient.getIdleTimeout());
+ assertEquals(oldClient.getHttpClient(), onlyBaseUrlChangedClient.getHttpClient());
+ }
+ try (Http2SolrClient idleTimeoutChangedClient =
+ new Http2SolrClient.Builder("baseSolrUrl")
+ .withHttpClient(oldClient)
+ .withIdleTimeout(3000, TimeUnit.MILLISECONDS)
+ .build()) {
+ assertFalse(oldClient.getIdleTimeout() == idleTimeoutChangedClient.getIdleTimeout());
+ assertEquals(3000, idleTimeoutChangedClient.getIdleTimeout());
+ }
+ }
+ }
+
/* Missed tests : - set cookies via interceptor - invariant params - compression */
}