blob: f7f08e941bb999553316174712e2f504ecaf5002 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.store.shared;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreStage;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
/**
* Tests around synchronization of concurrent indexing, pushes and pulls
* happening on a core of a shared collection {@link DocCollection#getSharedIndex()}
*/
public class SharedCoreConcurrencyTest extends SolrCloudSharedStoreTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String COLLECTION_NAME = "sharedCollection";
private static final String SHARD_NAME = "shard1";
/**
* Number of serial indexing iterations for each test. This is the main setting, queries and failover iterations
* stop after indexing ends. Higher the value, longer the tests would run.
*/
private static int INDEXING_ITERATIONS = TEST_NIGHTLY ? 100 : 20;
/**
* Maximum number of concurrent indexing requests per indexing iteration.
*/
private static int MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION = 10;
/**
* Maximum number of docs per indexing request.
*/
private static int MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST = 100;
/**
* Indexing can fail because of leader failures (especially when test {@link #includeFailovers()}).
* The test will re-attempt up till this number of times before bailing out. For test to succeed,
* indexing request have to succeed in these many attempts.
*/
private static int MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST = 10;
/**
* Maximum number of concurrent query requests per query iteration.
*/
private static int MAX_NUM_OF_CONCURRENT_QUERY_REQUESTS_PER_ITERATION = 10;
/**
* Querying is faster than indexing, to pace it better with indexing, a delay is added between each query iteration.
*/
private static int DELAY_MS_BETWEEN_EACH_QUERY_ITERATION = 50;
/**
* Minimum time between each failover.
*/
private static int DELAY_MS_BETWEEN_EACH_FAILOVER_ITERATION = 500;
/**
* Manages test state from start to end.
*/
private TestState testState;
@Before
public void setupTest() throws Exception {
int numNodes = 4;
setupCluster(numNodes);
testState = new TestState();
setupSolrNodesForTest();
int maxShardsPerNode = 1;
// One less than number of nodes.
// The extra node will be used at the end of test to verify
// the contents of shared store by querying for all docs on a new replica.
int numReplicas = numNodes - 1;
// Later on we can consider choosing random number of shards and replicas.
// To handle multiple shards, we need to update code where SHARD_NAME is used.
setupSharedCollectionWithShardNames(COLLECTION_NAME, maxShardsPerNode, numReplicas, SHARD_NAME);
}
@After
public void teardownTest() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Tests that concurrent indexing succeed.
*/
@Test
public void testIndexing() throws Exception {
final boolean includeDeletes = false;
includeIndexing(includeDeletes);
run();
}
/**
* Tests that concurrent indexing with concurrent queries succeed.
*/
@Test
public void testIndexingQueries() throws Exception {
final boolean includeDeletes = false;
includeIndexing(includeDeletes);
includeQueries();
run();
}
/**
* Tests that concurrent indexing with deletes and concurrent queries succeed.
*/
@Test
public void testIndexingQueriesDeletes() throws Exception {
final boolean includeDeletes = true;
includeIndexing(includeDeletes);
includeQueries();
run();
}
/**
* Tests that concurrent indexing with deletes, concurrent queries and explicit failovers succeed.
*/
// @Test
// TODO: This test flaps time to time. The symptom of the failure is missing docs i.e. indexing is declared successful
// but query could not reproduce all of the docs. I was able to repro this with NRT collection on vanilla 8.3 too.
// I have not root caused it yet. Keeping this test disabled, until the problem is root caused and fixed.
public void todoTestIndexingQueriesDeletesFailovers() throws Exception {
final boolean includeDeletes = true;
includeIndexing(includeDeletes);
includeQueries();
includeFailovers();
run();
}
/**
* It starts all the threads that are included in the test (indexing, queries and failovers) in parallel.
* Then wait for them to finish (run length depends on {@link #INDEXING_ITERATIONS}).
* At the end it makes sures that no critical section was breached and no unexpected error occurred.
* Then verify the contents of shared store by querying for all docs on a new replica.
*/
private void run() throws Exception {
testState.startIncludedThreads();
testState.waitForThreadsToStop();
analyzeCoreConcurrencyStagesForBreaches();
testState.checkErrors();
Replica newReplica = addReplica();
queryNewReplicaAndVerifyAllDocsFound(newReplica);
}
/**
* Adds a thread to test, that goes over {@link #INDEXING_ITERATIONS} or until it is interrupted.
* In each iteration it creates between 1 and {@link #MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION} threads
* by calling {@link #createIndexingThreads(int, int, boolean)}, starts them concurrently and wait for them to finish
* before going to next iteration. Each indexing thread adds between 1 and {@link #MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST}
* docs.
*
* @param includeDeletes whether to randomly mark some docs for deletion and delete them in subsequent indexing requests
* or not
*/
private void includeIndexing(boolean includeDeletes) {
Thread t = new Thread(() -> {
try {
for (int i = 0; i < INDEXING_ITERATIONS && !testState.stopRunning.get(); i++) {
int numIndexingThreads = random().nextInt(MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION) + 1;
int numDocsToAddPerThread = random().nextInt(MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST) + 1;
Thread[] indexingThreads = createIndexingThreads(numIndexingThreads, numDocsToAddPerThread, includeDeletes);
for (int j = 0; j < numIndexingThreads; j++) {
indexingThreads[j].start();
}
for (int j = 0; j < numIndexingThreads; j++) {
indexingThreads[j].join();
}
if (Thread.interrupted()) {
// we have been interrupted so we will stop running
testState.stopRunning.set(true);
}
}
} catch (Exception ex) {
testState.indexingErrors.add(ex.getMessage());
}
// everything else stops running when indexing finishes
testState.stopRunning.set(true);
});
testState.includeThread(t);
}
/**
* Creates {@code numIndexingThreads} threads with each adding {@code numDocsToAddPerThread}.
*
* @param includeDeletes whether to randomly mark some docs for deletion and delete them in subsequent indexing requests
* or not
*/
private Thread[] createIndexingThreads(int numIndexingThreads, int numDocsToAddPerThread, boolean includeDeletes) throws Exception {
log.info("numIndexingThreads=" + numIndexingThreads);
Thread[] indexingThreads = new Thread[numIndexingThreads];
for (int i = 0; i < numIndexingThreads && !testState.stopRunning.get(); i++) {
indexingThreads[i] = new Thread(() -> {
List<String> idsToAdd = new ArrayList<>();
// prepare the list of docs to add and delete outside the reattempt loop
for (int j = 0; j < numDocsToAddPerThread; j++) {
String docId = Integer.toString(testState.docIdGenerator.incrementAndGet());
idsToAdd.add(docId);
}
List<String> idsToDelete = testState.idBatchesToDelete.poll();
// attempt until succeeded or max attempts
for (int j = 0; j < MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST; j++) {
try {
String message = "attempt=" + (j + 1) + " numDocsToAdd=" + numDocsToAddPerThread + " docsToAdd=" + idsToAdd.toString();
if (idsToDelete != null) {
message += " numDocsToDelete=" + idsToDelete.size() + " docsToDelete=" + idsToDelete.toString();
}
log.info(message);
UpdateRequest updateReq = new UpdateRequest();
for (int k = 0; k < idsToAdd.size(); k++) {
updateReq.add("id", idsToAdd.get(k));
}
if (includeDeletes && idsToDelete != null) {
updateReq.deleteById(idsToDelete);
}
processUpdateRequest(updateReq);
testState.numDocsIndexed.addAndGet(numDocsToAddPerThread);
if (idsToDelete != null) {
testState.idsDeleted.addAll(idsToDelete);
}
// randomly select some docs that can be deleted
if (includeDeletes) {
List<String> idsThatCanBeDeleted = new ArrayList<>();
for (String indexedId : idsToAdd) {
if (random().nextBoolean()) {
idsThatCanBeDeleted.add(indexedId);
}
}
if (!idsThatCanBeDeleted.isEmpty()) {
testState.idBatchesToDelete.offer(idsThatCanBeDeleted);
}
}
// indexing was successful, stop attempting
break;
} catch (Exception ex) {
// last attempt also failed, record the error
if (j == MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST - 1) {
testState.indexingErrors.add(Throwables.getStackTraceAsString(ex));
}
}
}
});
}
return indexingThreads;
}
/**
* Sends update request to the server, randomly choosing whether to send it with commit=true or not
* Shared replica does not need an explicit commit since it always does an implicit hard commit but
* still it is valid to send an update with or without a commit, therefore, testing both.
*/
private void processUpdateRequest(UpdateRequest request) throws Exception {
UpdateResponse response = random().nextBoolean()
? request.process(cluster.getSolrClient(), COLLECTION_NAME)
: request.commit(cluster.getSolrClient(), COLLECTION_NAME);
if (response.getStatus() != 0) {
throw new RuntimeException("Update request failed with status=" + response.getStatus());
}
}
/**
* Adds a thread to test, that goes over iterations until the test is stopped {@link TestState#stopRunning}.
* In each iteration it creates between 1 and {@link #MAX_NUM_OF_CONCURRENT_QUERY_REQUESTS_PER_ITERATION} threads
* by calling {@link #createQueryThreads(int)}, starts them concurrently and wait for them to finish
* before going to next iteration. To pace it better with indexing, {@link #DELAY_MS_BETWEEN_EACH_QUERY_ITERATION}
* delay is added between each query iteration.
*/
private void includeQueries() throws Exception {
Thread t = new Thread(() -> {
try {
while (!testState.stopRunning.get()) {
int numQueryThreads = random().nextInt(MAX_NUM_OF_CONCURRENT_QUERY_REQUESTS_PER_ITERATION) + 1;
Thread[] indexingThreads = createQueryThreads(numQueryThreads);
for (int j = 0; j < numQueryThreads; j++) {
indexingThreads[j].start();
}
for (int j = 0; j < numQueryThreads; j++) {
indexingThreads[j].join();
}
Thread.sleep(DELAY_MS_BETWEEN_EACH_QUERY_ITERATION);
}
} catch (Exception ex) {
testState.queryErrors.add(ex.getMessage());
}
});
testState.includeThread(t);
}
/**
* Creates {@code numQueryThreads} threads with each querying all docs "*:*"
*/
private Thread[] createQueryThreads(int numQueryThreads) throws Exception {
log.info("numQueryThreads=" + numQueryThreads);
Thread[] queryThreads = new Thread[numQueryThreads];
for (int i = 0; i < numQueryThreads && !testState.stopRunning.get(); i++) {
queryThreads[i] = new Thread(() -> {
try {
/**
* Don't have a way to ensure freshness of results yet. When we add something for query freshness later
* we may use that here.
*
* {@link SolrProcessTracker#corePullTracker} cannot help in concurrent query scenarios since there
* is no one-to-one guarantee between query and an async pull.
*/
cluster.getSolrClient().query(COLLECTION_NAME, new ModifiableSolrParams().set("q", "*:*"));
} catch (Exception ex) {
testState.queryErrors.add(Throwables.getStackTraceAsString(ex));
}
});
}
return queryThreads;
}
/**
* Adds a thread to test, that goes over iterations until the test is stopped {@link TestState#stopRunning}.
* In each iteration it failovers to new leader by calling {@link #failOver()}. It waits
* for {@link #DELAY_MS_BETWEEN_EACH_FAILOVER_ITERATION} between each iteration.
*/
private void includeFailovers() throws Exception {
Thread t = new Thread(() -> {
try {
while (!testState.stopRunning.get()) {
failOver();
Thread.sleep(DELAY_MS_BETWEEN_EACH_FAILOVER_ITERATION);
}
} catch (Exception ex) {
testState.failoverError = Throwables.getStackTraceAsString(ex);
}
});
testState.includeThread(t);
}
/**
* Kills the current leader and waits for the new leader to be selected and then brings back up the killed leader
* as a follower replica. Before bringing back up the replica it randomly decides to delete its core directory.
*/
private void failOver() throws Exception {
DocCollection collection = getCollection();
Replica leaderReplicaBeforeSwitch = collection.getLeader(SHARD_NAME);
final String leaderReplicaNameBeforeSwitch = leaderReplicaBeforeSwitch.getName();
JettySolrRunner shardLeaderSolrRunnerBeforeSwitch = cluster.getReplicaJetty(leaderReplicaBeforeSwitch);
File leaderIndexDirBeforeSwitch = new File(shardLeaderSolrRunnerBeforeSwitch.getCoreContainer().getCoreRootDirectory()
+ "/" + leaderReplicaBeforeSwitch.getCoreName());
shardLeaderSolrRunnerBeforeSwitch.stop();
cluster.waitForJettyToStop(shardLeaderSolrRunnerBeforeSwitch);
waitForState("Timed out waiting for new replica to become leader", COLLECTION_NAME, (liveNodes, collectionState) -> {
Slice slice = collectionState.getSlice(SHARD_NAME);
if (slice.getLeader() == null) {
return false;
}
if (slice.getLeader().getName().equals(leaderReplicaNameBeforeSwitch)) {
return false;
}
return true;
});
if (random().nextBoolean()) {
FileUtils.deleteDirectory(leaderIndexDirBeforeSwitch);
}
shardLeaderSolrRunnerBeforeSwitch.start();
cluster.waitForNode(shardLeaderSolrRunnerBeforeSwitch, -1);
waitForState("Timed out waiting for restarted replica to become active", COLLECTION_NAME, (liveNodes, collectionState) -> {
Slice slice = collectionState.getSlice(SHARD_NAME);
if (slice.getReplica(leaderReplicaNameBeforeSwitch).getState() != Replica.State.ACTIVE) {
return false;
}
return true;
});
setupSolrProcess(shardLeaderSolrRunnerBeforeSwitch);
}
/**
* Goes over all the lives of a node(node gets a new life on restart) and then goes over each core's concurrency stages
* in each life. Logs the concurrency stages in the order they occurred and then analyze those stages to make sure no
* critical section was breached.
*/
private void analyzeCoreConcurrencyStagesForBreaches() {
// Goes over each node
for (Map.Entry<String, List<SolrProcessTracker>> nodeTracker :
testState.solrNodesTracker.entrySet()) {
String nodeName = nodeTracker.getKey();
int lifeCountForNode = nodeTracker.getValue().size();
// Goes over each life of a node
for (int i = 0; i < lifeCountForNode; i++) {
ConcurrentHashMap<String, ConcurrentLinkedQueue<String>> coreConcurrencyStageTracker = nodeTracker.getValue().get(i).coreConcurrencyStageTracker;
if (coreConcurrencyStageTracker.isEmpty()) {
log.info("life " + (i + 1) + "/" + lifeCountForNode + " of node " + nodeName + " is empty");
} else {
// Goes over each core
for (Map.Entry<String, ConcurrentLinkedQueue<String>> coreConcurrencyStagesEntry : coreConcurrencyStageTracker.entrySet()) {
String coreName = coreConcurrencyStagesEntry.getKey();
List<String> coreConcurrencyStages = new ArrayList<>(coreConcurrencyStagesEntry.getValue());
// Log line is truncated beyond certain length, therefore, printing them in the batches of 200
List<List<String>> batches = Lists.partition(coreConcurrencyStages, 200);
if (batches.isEmpty()) {
batches = new ArrayList<>(1);
batches.add(new ArrayList<>(0));
}
for (int j = 0; j < batches.size(); j++) {
log.info("batch " + (j + 1) + "/" + batches.size()
+ " of core " + coreName
+ " of life " + (i + 1) + "/" + lifeCountForNode
+ " of node " + nodeName
+ "\n" + batches.get(j).toString());
}
analyzeCoreConcurrencyStagesForBreaches(coreName, coreConcurrencyStages);
}
}
}
}
}
/**
* Analyze core's concurrency stages to make sure no critical section was breached. Detail of those critical sections
* can be found in {@link SharedCoreConcurrencyController}.
*/
private void analyzeCoreConcurrencyStagesForBreaches(String coreName, List<String> coreConcurrencyStages) {
SharedCoreStage currentStage = null;
int activePullers = 0; // number of threads that have started pulling and not finished
int activeIndexers = 0; // number of threads that have started indexing and not finished
int activePushers = 0; // number of threads that are actively pushing at any given time
for (String s : coreConcurrencyStages) {
String[] parts = s.split("\\.");
currentStage = SharedCoreStage.valueOf(parts[1]);
if (currentStage == SharedCoreStage.BLOB_PULL_STARTED) {
activePullers++;
} else if (currentStage == SharedCoreStage.BLOB_PULL_FINISHED) {
activePullers--;
} else if (currentStage == SharedCoreStage.LOCAL_INDEXING_STARTED) {
activeIndexers++;
} else if (currentStage == SharedCoreStage.BLOB_PUSH_STARTED) {
activePushers++;
} else if (currentStage == SharedCoreStage.BLOB_PUSH_FINISHED) {
activePushers--;
} else if (currentStage == SharedCoreStage.INDEXING_BATCH_FINISHED) {
activeIndexers--;
}
// making sure no other activity (including another pull) takes place during pull
assertFalse("Pull and indexing are interleaved, coreName=" + coreName + " currentStage=" + s, activePullers > 1 || (activePullers > 0 && (activeIndexers > 0 || activePushers > 0)));
// making sure push to blob are not disrupted by another push to blob
assertFalse("Blob push breached by another blob push, coreName=" + coreName + " currentStage=" + s, activePushers > 1);
}
}
/**
* Adds a new replica.
*/
private Replica addReplica() throws Exception {
List<String> existingReplicas = getCollection().getSlice(SHARD_NAME).getReplicas().stream().map(r -> r.getName()).collect(Collectors.toList());
// add another replica
assertTrue(CollectionAdminRequest.addReplicaToShard(COLLECTION_NAME, SHARD_NAME, Replica.Type.SHARED)
.process(cluster.getSolrClient()).isSuccess());
// Verify that new replica is created
waitForState("Timed-out waiting for new replica to be created", COLLECTION_NAME, clusterShape(1, existingReplicas.size() + 1));
Replica newReplica = null;
for (Replica r : getCollection().getSlice(SHARD_NAME).getReplicas()) {
if (!existingReplicas.contains(r.getName())) {
newReplica = r;
break;
}
}
assertNotNull("Could not find new replica", newReplica);
return newReplica;
}
/**
* Directly query a new {@code replica} and verifies that the empty replica is correctly hydrated from the shared store
* with all the indexed docs (after accounting for deletions).
*/
private void queryNewReplicaAndVerifyAllDocsFound(Replica replica) throws Exception {
try (SolrClient replicaDirectClient = getHttpSolrClient(replica.getBaseUrl() + "/" + replica.getCoreName())) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("q", "*:*").set("distrib", "false").set("rows", testState.numDocsIndexed.get());
CountDownLatch latch = new CountDownLatch(1);
Map<String, CountDownLatch> corePullTracker = testState.getCorePullTracker(replica.getNodeName());
corePullTracker.put(replica.getCoreName(), latch);
QueryResponse resp = replicaDirectClient.query(params);
assertEquals("new replica did not return empty results", 0, resp.getResults().getNumFound());
assertTrue(latch.await(120, TimeUnit.SECONDS));
resp = replicaDirectClient.query(params);
List<String> docs = resp.getResults().stream().map(r -> (String) r.getFieldValue("id")).collect(Collectors.toList());
assertEquals("we did not ask for all the docs found", resp.getResults().getNumFound(), docs.size());
Collections.sort(docs, new Comparator<String>() {
public int compare(String id1, String id2) {
return Integer.parseInt(id1) - Integer.parseInt(id2);
}
});
List<String> docsExpected = new ArrayList<>();
for (int i = 1; i <= testState.numDocsIndexed.get(); i++) {
String docId = Integer.toString(i);
if (!testState.idsDeleted.contains(docId)) {
docsExpected.add(docId);
}
}
log.info("numDocsFound=" + docs.size() + " docsFound= " + docs.toString());
assertEquals("wrong docs", docsExpected.size() + docsExpected.toString(), docs.size() + docs.toString());
}
}
/**
* Setup all the nodes for test.
*/
private void setupSolrNodesForTest() throws Exception {
for (JettySolrRunner solrProcess : cluster.getJettySolrRunners()) {
setupSolrProcess(solrProcess);
}
}
/**
* Setup solr process for test(process is one life of a node, restarts starts a new life).
*/
private void setupSolrProcess(JettySolrRunner solrProcess) throws Exception {
Map<String, CountDownLatch> corePullTracker = configureTestBlobProcessForNode(solrProcess);
ConcurrentHashMap<String, ConcurrentLinkedQueue<String>> coreConcurrencyStagesTracker = new ConcurrentHashMap<>();
configureTestSharedConcurrencyControllerForProcess(solrProcess, coreConcurrencyStagesTracker);
SolrProcessTracker processTracker = new SolrProcessTracker(corePullTracker, coreConcurrencyStagesTracker);
List<SolrProcessTracker> nodeTracker = testState.solrNodesTracker.computeIfAbsent(solrProcess.getNodeName(), k -> new ArrayList<>());
nodeTracker.add(processTracker);
}
private DocCollection getCollection() {
return cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
}
/**
* Setup {@link SharedCoreConcurrencyController} for the solr process so we can accumulate concurrency stages a core
* goes through during test.
*/
private void configureTestSharedConcurrencyControllerForProcess(
JettySolrRunner solrProcess, ConcurrentHashMap<String, ConcurrentLinkedQueue<String>> coreConcurrencyStagesMap) {
SharedCoreConcurrencyController concurrencyController = new SharedCoreConcurrencyController() {
@Override
public void recordState(String collectionName, String shardName, String coreName, SharedCoreStage stage) {
super.recordState(collectionName, shardName, coreName, stage);
ConcurrentLinkedQueue<String> coreConcurrencyStages = coreConcurrencyStagesMap.computeIfAbsent(coreName, k -> new ConcurrentLinkedQueue<>());
coreConcurrencyStages.add(Thread.currentThread().getName() + "." + stage.name());
}
};
setupTestSharedConcurrencyControllerForNode(concurrencyController, solrProcess);
}
/**
* Manages state for each test from start to end.
*/
private static class TestState {
/**
* Threads included in the test (indexing, queries and failovers).
*/
private final List<Thread> includedThreads = new ArrayList<>();
/**
* Indicator when to stop. It is set to true when either indexing is done or interrupted.
*/
private final AtomicBoolean stopRunning = new AtomicBoolean(false);
/**
* Used to provide unique id to each indexing doc.
*/
private final AtomicInteger docIdGenerator = new AtomicInteger(0);
/**
* At any given moment how many minimum number of docs that have been indexed (it does not account for deletion)
*/
private final AtomicInteger numDocsIndexed = new AtomicInteger(0);
/**
* Set of ids from indexed docs that can be deleted.
*/
private final ConcurrentLinkedQueue<List<String>> idBatchesToDelete = new ConcurrentLinkedQueue<>();
/**
* Ids that have been deleted.
*/
private final ConcurrentLinkedQueue<String> idsDeleted = new ConcurrentLinkedQueue<>();
/**
* Error setting up indexing or those encountered by indexing on the
* last attempt {@link #MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST} of each batch.
*/
private final ConcurrentLinkedQueue<String> indexingErrors = new ConcurrentLinkedQueue<>();
/**
* Error setting up queries or those encountered by queries.
*/
private final ConcurrentLinkedQueue<String> queryErrors = new ConcurrentLinkedQueue<>();
/**
* Error encountered when failing over to a new leader.
*/
private String failoverError = null;
/**
* Tracks the cores' pull and concurrency stage information for each life of a node (node gets a new life on restart).
* Key is the node name.
*/
private final Map<String, List<SolrProcessTracker>> solrNodesTracker = new HashMap<>();
/**
* Gets the core pull tracker for current life of the node.
*/
private Map<String, CountDownLatch> getCorePullTracker(String nodeName) {
List<SolrProcessTracker> allLives = solrNodesTracker.get(nodeName);
return allLives.get(allLives.size() - 1).corePullTracker;
}
/**
* Includes a thread into test.
*/
private void includeThread(Thread t) {
includedThreads.add(t);
}
/**
* Starts all the included threads.
*/
private void startIncludedThreads() throws Exception {
for (Thread t : includedThreads) {
t.start();
}
}
/**
* Wait for all the included threads to stop.
*/
private void waitForThreadsToStop() throws Exception {
for (Thread t : includedThreads) {
t.join();
}
log.info("docIdGenerator=" + docIdGenerator.get() + " numDocsIndexed=" + numDocsIndexed.get() + " numDocsDeleted=" + idsDeleted.size());
}
/**
* Check if any error was encountered during the test.
*/
private void checkErrors() {
assertTrue("indexingErrors=\n" + indexingErrors.toString() + "\n"
+ "queryErrors=\n" + queryErrors.toString() + "\n"
+ "failoverError=\n" + failoverError + "\n",
indexingErrors.isEmpty() && queryErrors.isEmpty() && failoverError == null);
}
}
/**
* Track cores' pull and concurrency stage information for one life of a node
*/
private static class SolrProcessTracker {
/**
* Per core pull tracker.
* Key is the core name.
*/
private final Map<String, CountDownLatch> corePullTracker;
/**
* Per core concurrency stage tracker.
* Key is the core name.
*
* For now we are only using single replica per node therefore it will only be single core
* but it should be able to handle multiple replicas per node, if test chooses to setup that way.
*/
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<String>> coreConcurrencyStageTracker;
private SolrProcessTracker(Map<String, CountDownLatch> corePullTracker,
ConcurrentHashMap<String, ConcurrentLinkedQueue<String>> coreConcurrencyStageTracker) {
this.corePullTracker = corePullTracker;
this.coreConcurrencyStageTracker = coreConcurrencyStageTracker;
}
}
}