blob: 5169d22d3a2bb0b16e6cd52abe2693c3aec2257f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.ZkTestServer.LimitViolationAction;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.util.TimeOut;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Test PeerSync when a node restarts and documents are indexed when node was down.
*
* This test is modeled after SyncSliceTest
*/
@Slow
public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private boolean success = false;
int docId = 0;
List<CloudJettyRunner> nodesDown = new ArrayList<>();
@Override
public void distribTearDown() throws Exception {
if (!success) {
printLayoutOnTearDown = true;
}
System.clearProperty("distribUpdateSoTimeout");
System.clearProperty("solr.directoryFactory");
System.clearProperty("solr.ulog.numRecordsToKeep");
System.clearProperty("tests.zk.violationReportAction");
super.distribTearDown();
}
public PeerSyncReplicationTest() {
super();
sliceCount = 1;
fixShardCount(3);
}
protected String getCloudSolrConfig() {
return "solrconfig-tlog.xml";
}
@Override
public void distribSetUp() throws Exception {
// set socket timeout small, so replica won't be put into LIR state when they restart
System.setProperty("distribUpdateSoTimeout", "3000");
// tlog gets deleted after node restarts if we use CachingDirectoryFactory.
// make sure that tlog stays intact after we restart a node
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
System.setProperty("tests.zk.violationReportAction", LimitViolationAction.IGNORE.toString());
super.distribSetUp();
}
@Test
//commented 2-Aug-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
public void test() throws Exception {
handle.clear();
handle.put("timestamp", SKIPVAL);
waitForThingsToLevelOut(30);
del("*:*");
// index enough docs and commit to establish frame of reference for PeerSync
for (int i = 0; i < 100; i++) {
indexDoc(id, docId, i1, 50, tlong, 50, t1,
"document number " + docId++);
}
commit();
waitForThingsToLevelOut(30);
try {
checkShardConsistency(false, true);
long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals(docId, cloudClientDocs);
CloudJettyRunner initialLeaderJetty = shardToLeaderJetty.get("shard1");
List<CloudJettyRunner> otherJetties = getOtherAvailableJetties(initialLeaderJetty);
CloudJettyRunner neverLeader = otherJetties.get(otherJetties.size() - 1);
otherJetties.remove(neverLeader) ;
// first shutdown a node that will never be a leader
forceNodeFailures(singletonList(neverLeader));
// node failure and recovery via PeerSync
log.info("Forcing PeerSync");
CloudJettyRunner nodePeerSynced = forceNodeFailureAndDoPeerSync(false);
// add a few more docs
indexDoc(id, docId, i1, 50, tlong, 50, t1,
"document number " + docId++);
indexDoc(id, docId, i1, 50, tlong, 50, t1,
"document number " + docId++);
commit();
cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals(docId, cloudClientDocs);
// now shutdown all other nodes except for 'nodeShutDownForFailure'
otherJetties.remove(nodePeerSynced);
forceNodeFailures(otherJetties);
waitForThingsToLevelOut(30);
checkShardConsistency(false, true);
// now shutdown the original leader
log.info("Now shutting down initial leader");
forceNodeFailures(singletonList(initialLeaderJetty));
log.info("Updating mappings from zk");
waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, new TimeOut(15, SECONDS, TimeSource.NANO_TIME));
updateMappingsFromZk(jettys, clients, true);
assertEquals("PeerSynced node did not become leader", nodePeerSynced, shardToLeaderJetty.get("shard1"));
// bring up node that was down all along, and let it PeerSync from the node that was forced to PeerSynce
bringUpDeadNodeAndEnsureNoReplication(neverLeader, false);
waitTillNodesActive();
checkShardConsistency(false, true);
// bring back all the nodes including initial leader
// (commented as reports Maximum concurrent create/delete watches above limit violation and reports thread leaks)
/*for(int i = 0 ; i < nodesDown.size(); i++) {
bringUpDeadNodeAndEnsureNoReplication(shardToLeaderJetty.get("shard1"), neverLeader, false);
}
checkShardConsistency(false, true);*/
// make sure leader has not changed after bringing initial leader back
assertEquals(nodePeerSynced, shardToLeaderJetty.get("shard1"));
// assert metrics
SolrMetricManager manager = nodePeerSynced.jetty.getCoreContainer().getMetricManager();
MetricRegistry registry = null;
for (String name : manager.registryNames()) {
if (name.startsWith("solr.core.collection1")) {
registry = manager.registry(name);
break;
}
}
assertNotNull(registry);
Map<String, Metric> metrics = registry.getMetrics();
assertTrue("REPLICATION.peerSync.time present", metrics.containsKey("REPLICATION.peerSync.time"));
assertTrue("REPLICATION.peerSync.errors present", metrics.containsKey("REPLICATION.peerSync.errors"));
Counter counter = (Counter)metrics.get("REPLICATION.peerSync.errors");
assertEquals(0L, counter.getCount());
success = true;
} finally {
System.clearProperty("solr.disableFingerprint");
}
}
class IndexInBackGround extends Thread {
private int numDocs;
private CloudJettyRunner runner;
public IndexInBackGround(int numDocs, CloudJettyRunner nodeToBringUp) {
super(getClassName());
this.numDocs = numDocs;
this.runner = nodeToBringUp;
}
public void run() {
try {
// If we don't wait for cores get loaded, the leader may put this replica into LIR state
waitForCoreLoading();
for (int i = 0; i < numDocs; i++) {
indexDoc(id, docId, i1, 50, tlong, 50, t1, "document number " + docId);
docId++;
// slow down adds, to get documents indexed while in PeerSync
Thread.sleep(100);
}
} catch (Exception e) {
log.error("Error indexing doc in background", e);
//Throwing an error here will kill the thread
}
}
private void waitForCoreLoading() throws InterruptedException {
while (true) {
if (runner.jetty.getCoreContainer() != null) {
CoreContainer cc = runner.jetty.getCoreContainer();
cc.waitForLoadingCoresToFinish(20000);
break;
}
Thread.sleep(100);
}
}
}
private void forceNodeFailures(List<CloudJettyRunner> replicasToShutDown) throws Exception {
for (CloudJettyRunner replicaToShutDown : replicasToShutDown) {
replicaToShutDown.jetty.stop();
}
int totalDown = 0;
Set<CloudJettyRunner> jetties = new HashSet<>();
jetties.addAll(shardToJetty.get("shard1"));
if (replicasToShutDown != null) {
jetties.removeAll(replicasToShutDown);
totalDown += replicasToShutDown.size();
}
jetties.removeAll(nodesDown);
totalDown += nodesDown.size();
assertEquals(getShardCount() - totalDown, jetties.size());
nodesDown.addAll(replicasToShutDown);
}
private CloudJettyRunner forceNodeFailureAndDoPeerSync(boolean disableFingerprint)
throws Exception {
// kill non leader - new leader could have all the docs or be missing one
CloudJettyRunner leaderJetty = shardToLeaderJetty.get("shard1");
List<CloudJettyRunner> nonLeaderJetties = getOtherAvailableJetties(leaderJetty);
CloudJettyRunner replicaToShutDown = nonLeaderJetties.get(random().nextInt(nonLeaderJetties.size())); // random non leader node
forceNodeFailures(Arrays.asList(replicaToShutDown));
// two docs need to be sync'd back when replica restarts
indexDoc(id, docId, i1, 50, tlong, 50, t1,
"document number " + docId++);
indexDoc(id, docId, i1, 50, tlong, 50, t1,
"document number " + docId++);
commit();
bringUpDeadNodeAndEnsureNoReplication(replicaToShutDown, disableFingerprint);
return replicaToShutDown;
}
private void bringUpDeadNodeAndEnsureNoReplication(CloudJettyRunner nodeToBringUp, boolean disableFingerprint)
throws Exception {
// disable fingerprint check if needed
System.setProperty("solr.disableFingerprint", String.valueOf(disableFingerprint));
// we wait a little bit, so socket between leader -> replica will be timeout
Thread.sleep(3000);
IndexInBackGround iib = new IndexInBackGround(50, nodeToBringUp);
iib.start();
// bring back dead node and ensure it recovers
nodeToBringUp.jetty.start();
nodesDown.remove(nodeToBringUp);
waitTillNodesActive();
waitForThingsToLevelOut(30);
Set<CloudJettyRunner> jetties = new HashSet<>();
jetties.addAll(shardToJetty.get("shard1"));
jetties.removeAll(nodesDown);
assertEquals(getShardCount() - nodesDown.size(), jetties.size());
waitForThingsToLevelOut(30);
iib.join();
cloudClient.commit();
checkShardConsistency(false, false);
long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals(docId, cloudClientDocs);
// if there was no replication, we should not have replication.properties file
String replicationProperties = nodeToBringUp.jetty.getSolrHome() + "/cores/" + DEFAULT_TEST_COLLECTION_NAME + "/data/replication.properties";
assertTrue("PeerSync failed. Had to fail back to replication", Files.notExists(Paths.get(replicationProperties)));
}
private void waitTillNodesActive() throws Exception {
for (int i = 0; i < 60; i++) {
Thread.sleep(3000);
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection collection1 = clusterState.getCollection("collection1");
Slice slice = collection1.getSlice("shard1");
Collection<Replica> replicas = slice.getReplicas();
boolean allActive = true;
Collection<String> nodesDownNames =
nodesDown.stream()
.map(n -> n.coreNodeName)
.collect(Collectors.toList());
Collection<Replica> replicasToCheck =
replicas.stream()
.filter(r -> !nodesDownNames.contains(r.getName()))
.collect(Collectors.toList());
for (Replica replica : replicasToCheck) {
if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
allActive = false;
break;
}
}
if (allActive) {
return;
}
}
printLayout();
fail("timeout waiting to see all nodes active");
}
private List<CloudJettyRunner> getOtherAvailableJetties(CloudJettyRunner leader) {
List<CloudJettyRunner> candidates = new ArrayList<>();
candidates.addAll(shardToJetty.get("shard1"));
if (leader != null) {
candidates.remove(leader);
}
candidates.removeAll(nodesDown);
return candidates;
}
protected void indexDoc(Object... fields) throws IOException,
SolrServerException {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
addFields(doc, "rnd_s", RandomStringUtils.random(random().nextInt(100) + 100));
UpdateRequest ureq = new UpdateRequest();
ureq.add(doc);
ModifiableSolrParams params = new ModifiableSolrParams();
ureq.setParams(params);
ureq.process(cloudClient);
}
// skip the randoms - they can deadlock...
@Override
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
addFields(doc, "rnd_b", true);
indexDoc(doc);
}
}