blob: ce9559273e10be75a5c348228d46f0c214d66a7e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.net.SocketException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.LukeRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrEventListener;
import org.apache.solr.index.LogDocMergePolicyFactory;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.MockStreamingSolrClients.Exp;
import org.apache.solr.update.SolrCmdDistributor.Error;
import org.apache.solr.update.SolrCmdDistributor.ForwardNode;
import org.apache.solr.update.SolrCmdDistributor.Node;
import org.apache.solr.update.SolrCmdDistributor.StdNode;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker;
import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
import org.apache.solr.util.TestInjection;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.xml.sax.SAXException;
// See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows machines occasionally
public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
private static enum NodeType {FORWARD, STANDARD};
private AtomicInteger id = new AtomicInteger();
@BeforeClass
public static void beforeClass() throws Exception {
// we can't use the Randomized merge policy because the test depends on
// being able to call optimize to have all deletes expunged.
systemSetPropertySolrTestsMergePolicyFactory(LogDocMergePolicyFactory.class.getName());
System.setProperty("solr.cloud.client.pollQueueTime", "2000");
}
@AfterClass
public static void afterClass() {
systemClearPropertySolrTestsMergePolicyFactory();
System.clearProperty("solr.cloud.client.pollQueueTime");
}
private UpdateShardHandler updateShardHandler;
public SolrCmdDistributorTest() throws ParserConfigurationException, IOException, SAXException {
updateShardHandler = new UpdateShardHandler(UpdateShardHandlerConfig.DEFAULT);
stress = 0;
}
public static String getSchemaFile() {
return "schema.xml";
}
public static String getSolrConfigFile() {
// use this because it has /update and is minimal
return "solrconfig-tlog.xml";
}
// TODO: for now we redefine this method so that it pulls from the above
// we don't get helpful override behavior due to the method being static
@Override
protected void createServers(int numShards) throws Exception {
System.setProperty("configSetBaseDir", TEST_HOME());
File controlHome = testDir.toPath().resolve("control").toFile();
seedSolrHome(controlHome);
writeCoreProperties(controlHome.toPath().resolve("cores").resolve(DEFAULT_TEST_CORENAME), DEFAULT_TEST_CORENAME);
controlJetty = createJetty(controlHome, testDir + "/control/data", null, getSolrConfigFile(), getSchemaFile());
controlJetty.start();
controlClient = createNewSolrClient(controlJetty.getLocalPort());
shardsArr = new String[numShards];
StringBuilder sb = new StringBuilder();
for (int i = 0; i < numShards; i++) {
if (sb.length() > 0) sb.append(',');
String shardname = "shard" + i;
Path shardHome = testDir.toPath().resolve(shardname);
seedSolrHome(shardHome.toFile());
Path coresPath = shardHome.resolve("cores");
writeCoreProperties(coresPath.resolve(DEFAULT_TEST_CORENAME), DEFAULT_TEST_CORENAME);
JettySolrRunner j = createJetty(shardHome.toFile(),
testDir + "/shard" + i + "/data", null, getSolrConfigFile(),
getSchemaFile());
j.start();
jettys.add(j);
clients.add(createNewSolrClient(j.getLocalPort()));
String shardStr = buildUrl(j.getLocalPort());
shardsArr[i] = shardStr;
sb.append(shardStr);
}
shards = sb.toString();
}
@SuppressWarnings("unchecked")
@Test
@ShardsFixed(num = 4)
public void test() throws Exception {
del("*:*");
ModifiableSolrParams params = new ModifiableSolrParams();
List<Node> nodes = new ArrayList<>();
AddUpdateCommand cmd = new AddUpdateCommand(null);
List<Error> errors;
CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
long numFound;
HttpSolrClient client;
ZkNodeProps nodeProps;
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
((HttpSolrClient) controlClient).getBaseURL(),
ZkStateReader.CORE_NAME_PROP, "");
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
// add one doc to controlClient
cmd.solrDoc = sdoc("id", id.incrementAndGet());
params = new ModifiableSolrParams();
cmdDistrib.distribAdd(cmd, nodes, params);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
errors = cmdDistrib.getErrors();
assertEquals(errors.toString(), 0, errors.size());
numFound = controlClient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
assertEquals(1, numFound);
client = (HttpSolrClient) clients.get(0);
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
client.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
}
int id2;
// add another 2 docs to control and 3 to client
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
cmd.solrDoc = sdoc("id", id.incrementAndGet());
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribAdd(cmd, nodes, params);
id2 = id.incrementAndGet();
AddUpdateCommand cmd2 = new AddUpdateCommand(null);
cmd2.solrDoc = sdoc("id", id2);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribAdd(cmd2, nodes, params);
AddUpdateCommand cmd3 = new AddUpdateCommand(null);
cmd3.solrDoc = sdoc("id", id.incrementAndGet());
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribAdd(cmd3, Collections.singletonList(nodes.get(1)), params);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
errors = cmdDistrib.getErrors();
}
assertEquals(errors.toString(), 0, errors.size());
SolrDocumentList results = controlClient.query(new SolrQuery("*:*")).getResults();
numFound = results.getNumFound();
assertEquals(results.toString(), 3, numFound);
numFound = client.query(new SolrQuery("*:*")).getResults()
.getNumFound();
assertEquals(3, numFound);
// now delete doc 2 which is on both control and client1
DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
dcmd.id = Integer.toString(id2);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribDelete(dcmd, nodes, params);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
errors = cmdDistrib.getErrors();
}
assertEquals(errors.toString(), 0, errors.size());
results = controlClient.query(new SolrQuery("*:*")).getResults();
numFound = results.getNumFound();
assertEquals(results.toString(), 2, numFound);
numFound = client.query(new SolrQuery("*:*")).getResults()
.getNumFound();
assertEquals(results.toString(), 2, numFound);
for (SolrClient c : clients) {
c.optimize();
//System.out.println(clients.get(0).request(new LukeRequest()));
}
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
int cnt = atLeast(303);
for (int i = 0; i < cnt; i++) {
nodes.clear();
for (SolrClient c : clients) {
if (random().nextBoolean()) {
continue;
}
HttpSolrClient httpClient = (HttpSolrClient) c;
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
httpClient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
}
AddUpdateCommand c = new AddUpdateCommand(null);
c.solrDoc = sdoc("id", id.incrementAndGet());
if (nodes.size() > 0) {
params = new ModifiableSolrParams();
cmdDistrib.distribAdd(c, nodes, params);
}
}
nodes.clear();
for (SolrClient c : clients) {
HttpSolrClient httpClient = (HttpSolrClient) c;
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
httpClient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
}
final AtomicInteger commits = new AtomicInteger();
for (JettySolrRunner jetty : jettys) {
CoreContainer cores = jetty.getCoreContainer();
try (SolrCore core = cores.getCore("collection1")) {
core.getUpdateHandler().registerCommitCallback(new SolrEventListener() {
@Override
public void init(@SuppressWarnings({"rawtypes"})NamedList args) {
}
@Override
public void postSoftCommit() {
}
@Override
public void postCommit() {
commits.incrementAndGet();
}
@Override
public void newSearcher(SolrIndexSearcher newSearcher,
SolrIndexSearcher currentSearcher) {
}
});
}
}
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
assertEquals(getShardCount(), commits.get());
for (SolrClient c : clients) {
NamedList<Object> resp = c.request(new LukeRequest());
assertEquals("SOLR-3428: We only did adds - there should be no deletes",
((NamedList<Object>) resp.get("index")).get("numDocs"),
((NamedList<Object>) resp.get("index")).get("maxDoc"));
}
}
testMaxRetries(NodeType.FORWARD);
testMaxRetries(NodeType.STANDARD);
testOneRetry(NodeType.FORWARD);
testOneRetry(NodeType.STANDARD);
testRetryNodeAgainstBadAddress();
testStdNodeRetriesSocketError();
testForwardNodeWontRetrySocketError();
testNodeWontRetryBadRequest(NodeType.FORWARD);
testNodeWontRetryBadRequest(NodeType.STANDARD);
testMinRfOnRetries(NodeType.FORWARD);
testMinRfOnRetries(NodeType.STANDARD);
testDistribOpenSearcher();
testReqShouldRetryNoRetries();
testReqShouldRetryMaxRetries();
testReqShouldRetryBadRequest();
testReqShouldRetryNotFound();
testReqShouldRetryDBQ();
testDeletes(false, true);
testDeletes(false, false);
testDeletes(true, true);
testDeletes(true, false);
getRfFromResponseShouldNotCloseTheInputStream();
testStuckUpdates();
}
private void testDeletes(boolean dbq, boolean withFailures) throws Exception {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
solrclient.commit(true, true);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
if (withFailures) {
streamingClients.setExp(Exp.CONNECT_EXCEPTION);
}
ArrayList<Node> nodes = new ArrayList<>();
final AtomicInteger retries = new AtomicInteger();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
Node retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
streamingClients.setExp(null);
retries.incrementAndGet();
return super.checkRetry(err);
}
};
nodes.add(retryNode);
for (int i = 0 ; i < 5 ; i++) {
AddUpdateCommand cmd = new AddUpdateCommand(null);
int currentId = id.incrementAndGet();
cmd.solrDoc = sdoc("id", currentId);
ModifiableSolrParams params = new ModifiableSolrParams();
cmdDistrib.distribAdd(cmd, nodes, params);
DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
if (dbq) {
dcmd.setQuery("id:" + currentId);
} else {
dcmd.setId(String.valueOf(currentId));
}
cmdDistrib.distribDelete(dcmd, nodes, params, false, null, null);
}
CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
cmdDistrib.distribCommit(ccmd, nodes, new ModifiableSolrParams());
cmdDistrib.finish();
int expectedRetryCount = 0;
if (withFailures) {
if (dbq) {
expectedRetryCount = 1; // just the first cmd would be retried
} else {
expectedRetryCount = 10;
}
}
assertEquals(expectedRetryCount, retries.get());
long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
// we will get java.net.ConnectException which we retry on
assertEquals(numFoundBefore, numFoundAfter);
assertEquals(0, cmdDistrib.getErrors().size());
}
}
private void testMinRfOnRetries(NodeType nodeType) throws Exception {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.CONNECT_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<>();
final AtomicInteger retries = new AtomicInteger();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
if (nodeType == NodeType.FORWARD) {
nodes.add(new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
if (retries.incrementAndGet() >= 3) {
streamingClients.setExp(null);
}
return super.checkRetry(err);
}
});
} else {
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
if (retries.incrementAndGet() >= 3) {
streamingClients.setExp(null);
}
return super.checkRetry(err);
}
});
}
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
ModifiableSolrParams params = new ModifiableSolrParams();
RollupRequestReplicationTracker rollupReqTracker = new RollupRequestReplicationTracker();
LeaderRequestReplicationTracker leaderReqTracker = new LeaderRequestReplicationTracker("shard1");
cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReqTracker, leaderReqTracker);
cmdDistrib.finish();
assertEquals(3, retries.get());
assertEquals(2, leaderReqTracker.getAchievedRf());// "2" here is because one would be the leader, that creates the instance of LeaderRequestReplicationTracker, the second one is the node
assertEquals(0, cmdDistrib.getErrors().size());
}
}
private void testMaxRetries(NodeType nodeType) throws IOException {
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.CONNECT_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<>();
final HttpSolrClient solrclient1 = (HttpSolrClient) clients.get(0);
final AtomicInteger retries = new AtomicInteger();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient1.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
Node retryNode;
if (nodeType == NodeType.FORWARD) {
retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 6) {
@Override
public boolean checkRetry(Error err) {
retries.incrementAndGet();
return super.checkRetry(err);
}
};
} else {
retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 6) {
@Override
public boolean checkRetry(Error err) {
retries.incrementAndGet();
return super.checkRetry(err);
}
};
}
nodes.add(retryNode);
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
ModifiableSolrParams params = new ModifiableSolrParams();
cmdDistrib.distribAdd(cmd, nodes, params);
cmdDistrib.finish();
assertEquals(7, retries.get());
assertEquals(1, cmdDistrib.getErrors().size());
}
}
private void testReqShouldRetryNoRetries() {
Error err = getError(new SocketException());
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 0), new UpdateRequest(), true);
assertFalse(req.shouldRetry(err));
}
private void testReqShouldRetryDBQ() {
Error err = getError(new SocketException());
UpdateRequest dbqReq = new UpdateRequest();
dbqReq.deleteByQuery("*:*");
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), dbqReq, true);
assertFalse(req.shouldRetry(err));
}
public void getRfFromResponseShouldNotCloseTheInputStream() {
UpdateRequest dbqReq = new UpdateRequest();
dbqReq.deleteByQuery("*:*");
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), dbqReq, true);
AtomicBoolean isClosed = new AtomicBoolean(false);
ByteArrayInputStream is = new ByteArrayInputStream(new byte[100]) {
@Override
public void close() throws IOException {
isClosed.set(true);
super.close();
}
};
req.trackRequestResult(null, is, true);
assertFalse("Underlying stream should not be closed!", isClosed.get());
}
private void testReqShouldRetryMaxRetries() {
Error err = getError(new SocketException());
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
assertTrue(req.shouldRetry(err));
req.retries++;
assertFalse(req.shouldRetry(err));
}
private void testReqShouldRetryBadRequest() {
Error err = getError(new SolrException(SolrException.ErrorCode.BAD_REQUEST, "bad request"));
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
assertFalse(req.shouldRetry(err));
}
private void testReqShouldRetryNotFound() {
Error err = getError(new SolrException(SolrException.ErrorCode.NOT_FOUND, "not found"));
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
assertTrue(req.shouldRetry(err));
}
private Error getError(Exception e) {
Error err = new Error();
err.e = e;
if (e instanceof SolrException) {
err.statusCode = ((SolrException)e).code();
}
return err;
}
private void testOneRetry(NodeType nodeType) throws Exception {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.CONNECT_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<>();
final AtomicInteger retries = new AtomicInteger();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
Node retryNode;
if (nodeType == NodeType.FORWARD) {
retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
streamingClients.setExp(null);
retries.incrementAndGet();
return super.checkRetry(err);
}
};
} else {
retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
streamingClients.setExp(null);
retries.incrementAndGet();
return super.checkRetry(err);
}
};
}
nodes.add(retryNode);
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
ModifiableSolrParams params = new ModifiableSolrParams();
CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
cmdDistrib.distribAdd(cmd, nodes, params);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
assertEquals(1, retries.get());
long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
// we will get java.net.ConnectException which we retry on
assertEquals(numFoundBefore + 1, numFoundAfter);
assertEquals(0, cmdDistrib.getErrors().size());
}
}
private void testNodeWontRetryBadRequest(NodeType nodeType) throws Exception {
ignoreException("Bad Request");
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.BAD_REQUEST);
ArrayList<Node> nodes = new ArrayList<>();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
ZkStateReader.CORE_NAME_PROP, "");
final AtomicInteger retries = new AtomicInteger();
Node retryNode;
if (nodeType == NodeType.FORWARD) {
retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
retries.incrementAndGet();
return super.checkRetry(err);
}
};
} else {
retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
retries.incrementAndGet();
return super.checkRetry(err);
}
};
}
nodes.add(retryNode);
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
ModifiableSolrParams params = new ModifiableSolrParams();
CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
cmdDistrib.distribAdd(cmd, nodes, params);
streamingClients.setExp(null);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
// it will checkRetry, but not actually do it...
assertEquals(1, retries.get());
long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
// we will get java.net.SocketException: Network is unreachable, which we don't retry on
assertEquals(numFoundBefore, numFoundAfter);
assertEquals(1, cmdDistrib.getErrors().size());
unIgnoreException("Bad Request");
}
}
private void testForwardNodeWontRetrySocketError() throws Exception {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.SOCKET_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<>();
final AtomicInteger retries = new AtomicInteger();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
ForwardNode retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
retries.incrementAndGet();
return super.checkRetry(err);
}
};
nodes.add(retryNode);
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
ModifiableSolrParams params = new ModifiableSolrParams();
CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
cmdDistrib.distribAdd(cmd, nodes, params);
streamingClients.setExp(null);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
// it will checkRetry, but not actually do it...
assertEquals(1, retries.get());
long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
// we will get java.net.SocketException: Network is unreachable, which we don't retry on
assertEquals(numFoundBefore, numFoundAfter);
assertEquals(1, cmdDistrib.getErrors().size());
}
}
private void testStdNodeRetriesSocketError() throws Exception {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.SOCKET_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<>();
final AtomicInteger retries = new AtomicInteger();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
Node retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
retries.incrementAndGet();
return super.checkRetry(err);
}
};
nodes.add(retryNode);
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
ModifiableSolrParams params = new ModifiableSolrParams();
cmdDistrib.distribAdd(cmd, nodes, params);
cmdDistrib.finish();
// it will checkRetry, but not actually do it...
assertEquals(6, retries.get());
}
}
private void testRetryNodeAgainstBadAddress() throws SolrServerException, IOException {
// Test RetryNode
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
ArrayList<Node> nodes = new ArrayList<>();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, DEAD_HOST_1 + context, ZkStateReader.CORE_NAME_PROP, "");
ForwardNode retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
@Override
public boolean checkRetry(Error err) {
ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
ZkStateReader.CORE_NAME_PROP, "");
this.nodeProps = new ZkCoreNodeProps(leaderProps);
return super.checkRetry(err);
}
};
nodes.add(retryNode);
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = sdoc("id", id.incrementAndGet());
ModifiableSolrParams params = new ModifiableSolrParams();
cmdDistrib.distribAdd(cmd, nodes, params);
CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
// different OS's will throw different exceptions for the bad address above
if (numFoundBefore != numFoundAfter) {
assertEquals(0, cmdDistrib.getErrors().size());
assertEquals(numFoundBefore + 1, numFoundAfter);
} else {
// we will get java.net.SocketException: Network is unreachable and not retry
assertEquals(numFoundBefore, numFoundAfter);
assertEquals(1, cmdDistrib.getErrors().size());
}
}
}
@Override
public void distribTearDown() throws Exception {
updateShardHandler.close();
super.distribTearDown();
}
private void testDistribOpenSearcher() {
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
UpdateRequest updateRequest = new UpdateRequest();
CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
//test default value (should be true)
cmdDistrib.addCommit(updateRequest, ccmd);
boolean openSearcher = updateRequest.getParams().getBool(UpdateParams.OPEN_SEARCHER, false);
assertTrue(openSearcher);
//test openSearcher = false
ccmd.openSearcher = false;
cmdDistrib.addCommit(updateRequest, ccmd);
openSearcher = updateRequest.getParams().getBool(UpdateParams.OPEN_SEARCHER, true);
assertFalse(openSearcher);
}
}
private void testStuckUpdates() throws Exception {
TestInjection.directUpdateLatch = new CountDownLatch(1);
List<Node> nodes = new ArrayList<>();
ModifiableSolrParams params;
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
for (int i = 0; i < 3; i++) {
nodes.clear();
for (SolrClient c : clients) {
if (random().nextBoolean()) {
continue;
}
HttpSolrClient httpClient = (HttpSolrClient) c;
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
httpClient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
StdNode node = new StdNode(new ZkCoreNodeProps(nodeProps));
nodes.add(node);
}
AddUpdateCommand c = new AddUpdateCommand(null);
c.solrDoc = sdoc("id", id.incrementAndGet());
if (nodes.size() > 0) {
params = new ModifiableSolrParams();
cmdDistrib.distribAdd(c, nodes, params, false);
}
}
cmdDistrib.blockAndDoRetries();
} catch (IOException e) {
assertTrue(e.toString(), e.toString().contains("processing has stalled"));
} finally {
TestInjection.directUpdateLatch.countDown();
}
}
}