blob: 61e6dbdc881c014fcc6cd955542817facff6557c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.SocketProxy;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.util.NamedList;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestCloudConsistency extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static Map<JettySolrRunner, SocketProxy> proxies;
private static Map<URI, JettySolrRunner> jettys;
@Before
public void setupCluster() throws Exception {
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
System.setProperty("leaderVoteWait", "60000");
configureCluster(4)
.addConfig("conf", configset("cloud-minimal"))
.configure();
// Add proxies
proxies = new HashMap<>(cluster.getJettySolrRunners().size());
jettys = new HashMap<>();
for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
SocketProxy proxy = new SocketProxy();
jetty.setProxyPort(proxy.getListenPort());
cluster.stopJettySolrRunner(jetty);//TODO: Can we avoid this restart
cluster.startJettySolrRunner(jetty);
proxy.open(jetty.getBaseUrl().toURI());
if (log.isInfoEnabled()) {
log.info("Adding proxy for URL: {}. Proxy: {}", jetty.getBaseUrl(), proxy.getUrl());
}
proxies.put(jetty, proxy);
jettys.put(proxy.getUrl(), jetty);
}
}
@After
public void tearDownCluster() throws Exception {
if (null != proxies) {
for (SocketProxy proxy : proxies.values()) {
proxy.close();
}
proxies = null;
}
jettys = null;
System.clearProperty("solr.directoryFactory");
System.clearProperty("solr.ulog.numRecordsToKeep");
System.clearProperty("leaderVoteWait");
shutdownCluster();
}
@Test
public void testOutOfSyncReplicasCannotBecomeLeader() throws Exception {
testOutOfSyncReplicasCannotBecomeLeader(false);
}
@Test
public void testOutOfSyncReplicasCannotBecomeLeaderAfterRestart() throws Exception {
testOutOfSyncReplicasCannotBecomeLeader(true);
}
public void testOutOfSyncReplicasCannotBecomeLeader(boolean onRestart) throws Exception {
final String collectionName = "outOfSyncReplicasCannotBecomeLeader-"+onRestart;
CollectionAdminRequest.createCollection(collectionName, 1, 3)
.setCreateNodeSet("")
.process(cluster.getSolrClient());
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
.setNode(cluster.getJettySolrRunner(0).getNodeName())
.process(cluster.getSolrClient());
waitForState("Timeout waiting for shard leader", collectionName, clusterShape(1, 1));
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
.setNode(cluster.getJettySolrRunner(1).getNodeName())
.process(cluster.getSolrClient());
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
.setNode(cluster.getJettySolrRunner(2).getNodeName())
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(collectionName, 1, 3);
waitForState("Timeout waiting for 1x3 collection", collectionName, clusterShape(1, 3));
addDocs(collectionName, 3, 1);
final Replica oldLeader = getCollectionState(collectionName).getSlice("shard1").getLeader();
assertEquals(cluster.getJettySolrRunner(0).getNodeName(), oldLeader.getNodeName());
if (onRestart) {
addDocToWhenOtherReplicasAreDown(collectionName, oldLeader, 4);
} else {
addDocWhenOtherReplicasAreNetworkPartitioned(collectionName, oldLeader, 4);
}
assertDocsExistInAllReplicas(getCollectionState(collectionName).getReplicas(), collectionName, 1, 4);
CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
}
/**
* Adding doc when replicas (not leader) are down,
* These replicas are out-of-sync hence they should not become leader even when current leader is DOWN.
* Leader should be on node - 0
*/
private void addDocToWhenOtherReplicasAreDown(String collection, Replica leader, int docId) throws Exception {
JettySolrRunner j1 = cluster.getJettySolrRunner(1);
JettySolrRunner j2 = cluster.getJettySolrRunner(2);
j1.stop();
j2.stop();
cluster.waitForJettyToStop(j1);
cluster.waitForJettyToStop(j2);
waitForState("", collection, (liveNodes, collectionState) ->
collectionState.getSlice("shard1").getReplicas().stream()
.filter(replica -> replica.getState() == Replica.State.DOWN).count() == 2);
addDocs(collection, 1, docId);
JettySolrRunner j3 = cluster.getJettySolrRunner(0);
j3.stop();
cluster.waitForJettyToStop(j3);
waitForState("", collection, (liveNodes, collectionState) -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
cluster.getJettySolrRunner(1).start();
cluster.getJettySolrRunner(2).start();
cluster.waitForNode(j1, 30);
cluster.waitForNode(j2, 30);
// the meat of the test -- wait to see if a different replica become a leader
// the correct behavior is that this should time out, if it succeeds we have a problem...
expectThrows(TimeoutException.class,
"Did not time out waiting for new leader, out of sync replica became leader",
() -> {
cluster.getSolrClient().waitForState(collection, 10, TimeUnit.SECONDS, (state) -> {
Replica newLeader = state.getSlice("shard1").getLeader();
if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
// this is is the bad case, our "bad" state was found before timeout
log.error("WTF: New Leader={}", newLeader);
return true;
}
return false; // still no bad state, wait for timeout
});
});
JettySolrRunner j0 = cluster.getJettySolrRunner(0);
j0.start();
cluster.waitForNode(j0, 30);
// waitForNode not solid yet?
cluster.waitForAllNodes(30);
waitForState("Timeout waiting for leader", collection, (liveNodes, collectionState) -> {
Replica newLeader = collectionState.getLeader("shard1");
return newLeader != null && newLeader.getName().equals(leader.getName());
});
waitForState("Timeout waiting for active collection", collection, clusterShape(1, 3));
}
/**
* Adding doc when replicas (not leader) are network partitioned with leader,
* These replicas are out-of-sync hence they should not become leader even when current leader is DOWN.
* Leader should be on node - 0
*/
private void addDocWhenOtherReplicasAreNetworkPartitioned(String collection, Replica leader, int docId) throws Exception {
for (int i = 0; i < 3; i++) {
proxies.get(cluster.getJettySolrRunner(i)).close();
}
addDoc(collection, docId, cluster.getJettySolrRunner(0));
JettySolrRunner j1 = cluster.getJettySolrRunner(0);
j1.stop();
cluster.waitForJettyToStop(j1);
for (int i = 1; i < 3; i++) {
proxies.get(cluster.getJettySolrRunner(i)).reopen();
}
waitForState("Timeout waiting for leader goes DOWN", collection, (liveNodes, collectionState)
-> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
// the meat of the test -- wait to see if a different replica become a leader
// the correct behavior is that this should time out, if it succeeds we have a problem...
expectThrows(TimeoutException.class,
"Did not time out waiting for new leader, out of sync replica became leader",
() -> {
cluster.getSolrClient().waitForState(collection, 10, TimeUnit.SECONDS, (state) -> {
Replica newLeader = state.getSlice("shard1").getLeader();
if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
// this is is the bad case, our "bad" state was found before timeout
log.error("WTF: New Leader={}", newLeader);
return true;
}
return false; // still no bad state, wait for timeout
});
});
proxies.get(cluster.getJettySolrRunner(0)).reopen();
cluster.getJettySolrRunner(0).start();
cluster.waitForAllNodes(30);;
waitForState("Timeout waiting for leader", collection, (liveNodes, collectionState) -> {
Replica newLeader = collectionState.getLeader("shard1");
return newLeader != null && newLeader.getName().equals(leader.getName());
});
waitForState("Timeout waiting for active collection", collection, clusterShape(1, 3));
cluster.waitForActiveCollection(collection, 1, 3);
}
private void addDocs(String collection, int numDocs, int startId) throws SolrServerException, IOException {
List<SolrInputDocument> docs = new ArrayList<>(numDocs);
for (int i = 0; i < numDocs; i++) {
int id = startId + i;
docs.add(new SolrInputDocument("id", String.valueOf(id), "fieldName_s", String.valueOf(id)));
}
cluster.getSolrClient().add(collection, docs);
cluster.getSolrClient().commit(collection);
}
private void addDoc(String collection, int docId, JettySolrRunner solrRunner) throws IOException, SolrServerException {
try (HttpSolrClient solrClient = new HttpSolrClient.Builder(solrRunner.getBaseUrl().toString()).build()) {
solrClient.add(collection, new SolrInputDocument("id", String.valueOf(docId), "fieldName_s", String.valueOf(docId)));
solrClient.commit(collection);
}
}
private void assertDocsExistInAllReplicas(List<Replica> notLeaders,
String testCollectionName, int firstDocId, int lastDocId) throws Exception {
Replica leader =
cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000);
HttpSolrClient leaderSolr = getHttpSolrClient(leader, testCollectionName);
List<HttpSolrClient> replicas =
new ArrayList<HttpSolrClient>(notLeaders.size());
for (Replica r : notLeaders) {
replicas.add(getHttpSolrClient(r, testCollectionName));
}
try {
for (int d = firstDocId; d <= lastDocId; d++) {
String docId = String.valueOf(d);
assertDocExists(leaderSolr, testCollectionName, docId);
for (HttpSolrClient replicaSolr : replicas) {
assertDocExists(replicaSolr, testCollectionName, docId);
}
}
} finally {
if (leaderSolr != null) {
leaderSolr.close();
}
for (HttpSolrClient replicaSolr : replicas) {
replicaSolr.close();
}
}
}
private void assertDocExists(HttpSolrClient solr, String coll, String docId) throws Exception {
@SuppressWarnings({"rawtypes"})
NamedList rsp = realTimeGetDocId(solr, docId);
String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), docId);
assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
+ " due to: " + match + "; rsp="+rsp, match == null);
}
@SuppressWarnings({"rawtypes"})
private NamedList realTimeGetDocId(HttpSolrClient solr, String docId) throws SolrServerException, IOException {
QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
return solr.request(qr);
}
protected HttpSolrClient getHttpSolrClient(Replica replica, String coll) throws Exception {
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
String url = zkProps.getBaseUrl() + "/" + coll;
return getHttpSolrClient(url);
}
}