blob: 271bf678766f55db25555d44cf2e1419fff83bd1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.cdcr;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is testing the cdcr extension to the {@link org.apache.solr.handler.ReplicationHandler} and
* {@link org.apache.solr.handler.IndexFetcher}.
*/
@Nightly
public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
public void distribSetUp() throws Exception {
schemaString = "schema15.xml"; // we need a string id
createTargetCollection = false; // we do not need the target cluster
shardCount = 1; // we need only one shard
// we need a persistent directory, otherwise the UpdateHandler will erase existing tlog files after restarting a node
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
super.distribSetUp();
}
/**
* Test the scenario where the follower is killed from the start. The replication
* strategy should fetch all the missing tlog files from the leader.
*/
@Test
@ShardsFixed(num = 2)
public void testFullReplication() throws Exception {
List<CloudJettyRunner> followers = this.getShardToFollowerJetty(SOURCE_COLLECTION, SHARD1);
followers.get(0).jetty.stop();
for (int i = 0; i < 10; i++) {
List<SolrInputDocument> docs = new ArrayList<>();
for (int j = i * 10; j < (i * 10) + 10; j++) {
docs.add(getDoc(id, Integer.toString(j)));
}
index(SOURCE_COLLECTION, docs);
}
assertNumDocs(100, SOURCE_COLLECTION);
// Restart the follower node to trigger Replication strategy
this.restartServer(followers.get(0));
this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
}
/**
* Test the scenario where the follower is killed before receiving all the documents. The replication
* strategy should fetch all the missing tlog files from the leader.
*/
@Test
@ShardsFixed(num = 2)
public void testPartialReplication() throws Exception {
for (int i = 0; i < 5; i++) {
List<SolrInputDocument> docs = new ArrayList<>();
for (int j = i * 20; j < (i * 20) + 20; j++) {
docs.add(getDoc(id, Integer.toString(j)));
}
index(SOURCE_COLLECTION, docs);
}
List<CloudJettyRunner> followers = this.getShardToFollowerJetty(SOURCE_COLLECTION, SHARD1);
followers.get(0).jetty.stop();
for (int i = 5; i < 10; i++) {
List<SolrInputDocument> docs = new ArrayList<>();
for (int j = i * 20; j < (i * 20) + 20; j++) {
docs.add(getDoc(id, Integer.toString(j)));
}
index(SOURCE_COLLECTION, docs);
}
assertNumDocs(200, SOURCE_COLLECTION);
// Restart the follower node to trigger Replication strategy
this.restartServer(followers.get(0));
// at this stage, the follower should have replicated the 5 missing tlog files
this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
}
/**
* Test the scenario where the follower is killed before receiving a commit. This creates a truncated tlog
* file on the follower node. The replication strategy should detect this truncated file, and fetch the
* non-truncated file from the leader.
*/
@Test
@ShardsFixed(num = 2)
public void testPartialReplicationWithTruncatedTlog() throws Exception {
CloudSolrClient client = createCloudClient(SOURCE_COLLECTION);
List<CloudJettyRunner> followers = this.getShardToFollowerJetty(SOURCE_COLLECTION, SHARD1);
try {
for (int i = 0; i < 10; i++) {
for (int j = i * 20; j < (i * 20) + 20; j++) {
client.add(getDoc(id, Integer.toString(j)));
// Stop the follower in the middle of a batch to create a truncated tlog on the follower
if (j == 45) {
followers.get(0).jetty.stop();
}
}
commit(SOURCE_COLLECTION);
}
} finally {
client.close();
}
assertNumDocs(200, SOURCE_COLLECTION);
// Restart the follower node to trigger Replication recovery
this.restartServer(followers.get(0));
// at this stage, the follower should have replicated the 5 missing tlog files
this.assertUpdateLogsEquals(SOURCE_COLLECTION, 10);
}
/**
* Test the scenario where the follower first recovered with a PeerSync strategy, then with a Replication strategy.
* The PeerSync strategy will generate a single tlog file for all the missing updates on the follower node.
* If a Replication strategy occurs at a later stage, it should remove this tlog file generated by PeerSync
* and fetch the corresponding tlog files from the leader.
*/
@Test
@ShardsFixed(num = 2)
public void testPartialReplicationAfterPeerSync() throws Exception {
for (int i = 0; i < 5; i++) {
List<SolrInputDocument> docs = new ArrayList<>();
for (int j = i * 10; j < (i * 10) + 10; j++) {
docs.add(getDoc(id, Integer.toString(j)));
}
index(SOURCE_COLLECTION, docs);
}
List<CloudJettyRunner> followers = this.getShardToFollowerJetty(SOURCE_COLLECTION, SHARD1);
followers.get(0).jetty.stop();
for (int i = 5; i < 10; i++) {
List<SolrInputDocument> docs = new ArrayList<>();
for (int j = i * 10; j < (i * 10) + 10; j++) {
docs.add(getDoc(id, Integer.toString(j)));
}
index(SOURCE_COLLECTION, docs);
}
assertNumDocs(100, SOURCE_COLLECTION);
// Restart the follower node to trigger PeerSync recovery
// (the update windows between leader and follower is small enough)
this.restartServer(followers.get(0));
followers.get(0).jetty.stop();
for (int i = 10; i < 15; i++) {
List<SolrInputDocument> docs = new ArrayList<>();
for (int j = i * 20; j < (i * 20) + 20; j++) {
docs.add(getDoc(id, Integer.toString(j)));
}
index(SOURCE_COLLECTION, docs);
}
// restart the follower node to trigger Replication recovery
this.restartServer(followers.get(0));
// at this stage, the follower should have replicated the 5 missing tlog files
this.assertUpdateLogsEquals(SOURCE_COLLECTION, 15);
}
/**
* Test the scenario where the follower is killed while the leader is still receiving updates.
* The follower should buffer updates while in recovery, then replay them at the end of the recovery.
* If updates were properly buffered and replayed, then the follower should have the same number of documents
* than the leader. This checks if cdcr tlog replication interferes with buffered updates - SOLR-8263.
*/
@Test
@ShardsFixed(num = 2)
public void testReplicationWithBufferedUpdates() throws Exception {
List<CloudJettyRunner> followers = this.getShardToFollowerJetty(SOURCE_COLLECTION, SHARD1);
AtomicInteger numDocs = new AtomicInteger(0);
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new SolrNamedThreadFactory("cdcr-test-update-scheduler"));
executor.scheduleWithFixedDelay(new UpdateThread(numDocs), 10, 10, TimeUnit.MILLISECONDS);
// Restart the follower node to trigger Replication strategy
this.restartServer(followers.get(0));
// shutdown the update thread and wait for its completion
executor.shutdown();
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
// check that we have the expected number of documents in the cluster
assertNumDocs(numDocs.get(), SOURCE_COLLECTION);
// check that we have the expected number of documents on the follower
assertNumDocs(numDocs.get(), followers.get(0));
}
private void assertNumDocs(int expectedNumDocs, CloudJettyRunner jetty)
throws InterruptedException, IOException, SolrServerException {
SolrClient client = createNewSolrServer(jetty.url);
try {
int cnt = 30; // timeout after 15 seconds
AssertionError lastAssertionError = null;
while (cnt > 0) {
try {
assertEquals(expectedNumDocs, client.query(new SolrQuery("*:*")).getResults().getNumFound());
return;
}
catch (AssertionError e) {
lastAssertionError = e;
cnt--;
Thread.sleep(500);
}
}
throw new AssertionError("Timeout while trying to assert number of documents @ " + jetty.url, lastAssertionError);
} finally {
client.close();
}
}
private class UpdateThread implements Runnable {
private AtomicInteger numDocs;
private UpdateThread(AtomicInteger numDocs) {
this.numDocs = numDocs;
}
@Override
public void run() {
try {
List<SolrInputDocument> docs = new ArrayList<>();
for (int j = numDocs.get(); j < (numDocs.get() + 10); j++) {
docs.add(getDoc(id, Integer.toString(j)));
}
index(SOURCE_COLLECTION, docs);
numDocs.getAndAdd(10);
if (log.isInfoEnabled()) {
log.info("Sent batch of {} updates - numDocs:{}", docs.size(), numDocs);
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private List<CloudJettyRunner> getShardToFollowerJetty(String collection, String shard) {
List<CloudJettyRunner> jetties = new ArrayList<>(shardToJetty.get(collection).get(shard));
CloudJettyRunner leader = shardToLeaderJetty.get(collection).get(shard);
jetties.remove(leader);
return jetties;
}
/**
* Asserts that the update logs are in sync between the leader and follower. The leader and the followers
* must have identical tlog files.
*/
protected void assertUpdateLogsEquals(String collection, int numberOfTLogs) throws Exception {
CollectionInfo info = collectInfo(collection);
Map<String, List<CollectionInfo.CoreInfo>> shardToCoresMap = info.getShardToCoresMap();
for (String shard : shardToCoresMap.keySet()) {
Map<Long, Long> leaderFilesMeta = this.getFilesMeta(info.getLeader(shard).ulogDir);
Map<Long, Long> followerFilesMeta = this.getFilesMeta(info.getReplicas(shard).get(0).ulogDir);
assertEquals("Incorrect number of tlog files on the leader", numberOfTLogs, leaderFilesMeta.size());
assertEquals("Incorrect number of tlog files on the follower", numberOfTLogs, followerFilesMeta.size());
for (Long leaderFileVersion : leaderFilesMeta.keySet()) {
assertTrue("Follower is missing a tlog for version " + leaderFileVersion, followerFilesMeta.containsKey(leaderFileVersion));
assertEquals("Follower's tlog file size differs for version " + leaderFileVersion, leaderFilesMeta.get(leaderFileVersion), followerFilesMeta.get(leaderFileVersion));
}
}
}
private Map<Long, Long> getFilesMeta(String dir) {
File file = new File(dir);
if (!file.isDirectory()) {
assertTrue("Path to tlog " + dir + " does not exists or it's not a directory.", false);
}
Map<Long, Long> filesMeta = new HashMap<>();
for (File tlogFile : file.listFiles()) {
filesMeta.put(Math.abs(Long.parseLong(tlogFile.getName().substring(tlogFile.getName().lastIndexOf('.') + 1))), tlogFile.length());
}
return filesMeta;
}
}