blob: 910ef3f1c53a6a7438d271b8bae41790ff5883fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.math3.primes.Primes;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.zookeeper.KeeperException;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slow
public class TestStressInPlaceUpdates extends AbstractFullDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void beforeSuperClass() throws Exception {
schemaString = "schema-inplace-updates.xml";
configString = "solrconfig-tlog.xml";
// sanity check that autocommits are disabled
initCore(configString, schemaString);
assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxTime);
assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxTime);
assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxDocs);
assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxDocs);
}
public TestStressInPlaceUpdates() {
super();
sliceCount = 1;
fixShardCount(3);
}
protected final ConcurrentHashMap<Integer, DocInfo> model = new ConcurrentHashMap<>();
protected Map<Integer, DocInfo> committedModel = new HashMap<>();
protected long snapshotCount;
protected long committedModelClock;
protected int clientIndexUsedForCommit;
protected volatile int lastId;
protected final String field = "val_l";
private void initModel(int ndocs) {
for (int i = 0; i < ndocs; i++) {
// seed versions w/-1 so "from scratch" adds/updates will fail optimistic concurrency checks
// if some other thread beats us to adding the id
model.put(i, new DocInfo(-1L, 0, 0));
}
committedModel.putAll(model);
}
SolrClient leaderClient = null;
@Test
@ShardsFixed(num = 3)
// commented out on: 17-Feb-2019 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 09-Apr-2018
public void stressTest() throws Exception {
waitForRecoveriesToFinish(true);
this.leaderClient = getClientForLeader();
assertNotNull("Couldn't obtain client for the leader of the shard", this.leaderClient);
final int commitPercent = 5 + random().nextInt(20);
final int softCommitPercent = 30 + random().nextInt(75); // what percent of the commits are soft
final int deletePercent = 4 + random().nextInt(25);
final int deleteByQueryPercent = random().nextInt(8);
final int ndocs = atLeast(5);
int nWriteThreads = 5 + random().nextInt(12);
int fullUpdatePercent = 5 + random().nextInt(50);
// query variables
final int percentRealtimeQuery = 75;
// number of cumulative read/write operations by all threads
final AtomicLong operations = new AtomicLong(5000);
int nReadThreads = 5 + random().nextInt(12);
/** // testing
final int commitPercent = 5;
final int softCommitPercent = 100; // what percent of the commits are soft
final int deletePercent = 0;
final int deleteByQueryPercent = 50;
final int ndocs = 10;
int nWriteThreads = 10;
final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
// query variables
final int percentRealtimeQuery = 101;
final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
int nReadThreads = 10;
int fullUpdatePercent = 20;
**/
if (log.isInfoEnabled()) {
log.info("{}", Arrays.asList
("commitPercent", commitPercent, "softCommitPercent", softCommitPercent,
"deletePercent", deletePercent, "deleteByQueryPercent", deleteByQueryPercent,
"ndocs", ndocs, "nWriteThreads", nWriteThreads, "percentRealtimeQuery", percentRealtimeQuery,
"operations", operations, "nReadThreads", nReadThreads));
}
initModel(ndocs);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < nWriteThreads; i++) {
Thread thread = new Thread("WRITER" + i) {
Random rand = new Random(random().nextInt());
@Override
public void run() {
try {
while (operations.decrementAndGet() > 0) {
int oper = rand.nextInt(50);
if (oper < commitPercent) {
Map<Integer, DocInfo> newCommittedModel;
long version;
synchronized (TestStressInPlaceUpdates.this) {
// take a snapshot of the model
// this is safe to do w/o synchronizing on the model because it's a ConcurrentHashMap
newCommittedModel = new HashMap<>(model);
version = snapshotCount++;
int chosenClientIndex = rand.nextInt(clients.size());
if (rand.nextInt(100) < softCommitPercent) {
log.info("softCommit start");
clients.get(chosenClientIndex).commit(true, true, true);
log.info("softCommit end");
} else {
log.info("hardCommit start");
clients.get(chosenClientIndex).commit();
log.info("hardCommit end");
}
// install this model snapshot only if it's newer than the current one
if (version >= committedModelClock) {
if (VERBOSE) {
log.info("installing new committedModel version={}", committedModelClock);
}
clientIndexUsedForCommit = chosenClientIndex;
committedModel = newCommittedModel;
committedModelClock = version;
}
}
continue;
}
int id;
if (rand.nextBoolean()) {
id = rand.nextInt(ndocs);
} else {
id = lastId; // reuse the last ID half of the time to force more race conditions
}
// set the lastId before we actually change it sometimes to try and
// uncover more race conditions between writing and reading
boolean before = rand.nextBoolean();
if (before) {
lastId = id;
}
DocInfo info = model.get(id);
// yield after getting the next version to increase the odds of updates happening out of order
if (rand.nextBoolean()) Thread.yield();
if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
final boolean dbq = (oper >= commitPercent + deletePercent);
final String delType = dbq ? "DBI": "DBQ";
log.info("{} id {}: {}", delType, id, info);
Long returnedVersion = null;
try {
returnedVersion = deleteDocAndGetVersion(Integer.toString(id), params("_version_", Long.toString(info.version)), dbq);
log.info("{}: Deleting id={}, version={}. Returned version={}"
, delType, id, info.version, returnedVersion);
} catch (RuntimeException e) {
if (e.getMessage() != null && e.getMessage().contains("version conflict")
|| e.getMessage() != null && e.getMessage().contains("Conflict")) {
// Its okay for a leader to reject a concurrent request
log.warn("Conflict during {}, rejected id={}, {}", delType, id, e);
returnedVersion = null;
} else {
throw e;
}
}
// only update model if update had no conflict & the version is newer
synchronized (model) {
DocInfo currInfo = model.get(id);
if (null != returnedVersion &&
(Math.abs(returnedVersion.longValue()) > Math.abs(currInfo.version))) {
model.put(id, new DocInfo(returnedVersion.longValue(), 0, 0));
}
}
} else {
int val1 = info.intFieldValue;
long val2 = info.longFieldValue;
int nextVal1 = val1;
long nextVal2 = val2;
int addOper = rand.nextInt(30);
Long returnedVersion;
if (addOper < fullUpdatePercent || info.version <= 0) { // if document was never indexed or was deleted
// FULL UPDATE
nextVal1 = Primes.nextPrime(val1 + 1);
nextVal2 = nextVal1 * 1000000000l;
try {
returnedVersion = addDocAndGetVersion("id", id, "title_s", "title" + id, "val1_i_dvo", nextVal1, "val2_l_dvo", nextVal2, "_version_", info.version);
log.info("FULL: Writing id={}, val=[{},{}], version={}, Prev was=[{},{}]. Returned version={}"
,id, nextVal1, nextVal2, info.version, val1, val2, returnedVersion);
} catch (RuntimeException e) {
if (e.getMessage() != null && e.getMessage().contains("version conflict")
|| e.getMessage() != null && e.getMessage().contains("Conflict")) {
// Its okay for a leader to reject a concurrent request
log.warn("Conflict during full update, rejected id={}, {}", id, e);
returnedVersion = null;
} else {
throw e;
}
}
} else {
// PARTIAL
nextVal2 = val2 + val1;
try {
returnedVersion = addDocAndGetVersion("id", id, "val2_l_dvo", map("inc", String.valueOf(val1)), "_version_", info.version);
log.info("PARTIAL: Writing id={}, val=[{},{}], version={}, Prev was=[{},{}]. Returned version={}"
,id, nextVal1, nextVal2, info.version, val1, val2, returnedVersion);
} catch (RuntimeException e) {
if (e.getMessage() != null && e.getMessage().contains("version conflict")
|| e.getMessage() != null && e.getMessage().contains("Conflict")) {
// Its okay for a leader to reject a concurrent request
log.warn("Conflict during partial update, rejected id={}, {}", id, e);
} else if (e.getMessage() != null && e.getMessage().contains("Document not found for update.")
&& e.getMessage().contains("id="+id)) {
log.warn("Attempted a partial update for a recently deleted document, rejected id={}, {}", id, e);
} else {
throw e;
}
returnedVersion = null;
}
}
// only update model if update had no conflict & the version is newer
synchronized (model) {
DocInfo currInfo = model.get(id);
if (null != returnedVersion &&
(Math.abs(returnedVersion.longValue()) > Math.abs(currInfo.version))) {
model.put(id, new DocInfo(returnedVersion.longValue(), nextVal1, nextVal2));
}
}
}
if (!before) {
lastId = id;
}
}
} catch (Throwable e) {
operations.set(-1L);
log.error("", e);
throw new RuntimeException(e);
}
}
};
threads.add(thread);
}
// Read threads
for (int i = 0; i < nReadThreads; i++) {
Thread thread = new Thread("READER" + i) {
Random rand = new Random(random().nextInt());
@SuppressWarnings("unchecked")
@Override
public void run() {
try {
while (operations.decrementAndGet() >= 0) {
// bias toward a recently changed doc
int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
// when indexing, we update the index, then the model
// so when querying, we should first check the model, and then the index
boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
DocInfo expected;
if (realTime) {
expected = model.get(id);
} else {
synchronized (TestStressInPlaceUpdates.this) {
expected = committedModel.get(id);
}
}
if (VERBOSE) {
log.info("querying id {}", id);
}
ModifiableSolrParams params = new ModifiableSolrParams();
if (realTime) {
params.set("wt", "json");
params.set("qt", "/get");
params.set("ids", Integer.toString(id));
} else {
params.set("wt", "json");
params.set("q", "id:" + Integer.toString(id));
params.set("omitHeader", "true");
}
int clientId = rand.nextInt(clients.size());
if (!realTime) clientId = clientIndexUsedForCommit;
QueryResponse response = clients.get(clientId).query(params);
if (response.getResults().size() == 0) {
// there's no info we can get back with a delete, so not much we can check without further synchronization
} else if (response.getResults().size() == 1) {
final SolrDocument actual = response.getResults().get(0);
final String msg = "Realtime=" + realTime + ", expected=" + expected + ", actual=" + actual;
assertNotNull(msg, actual);
final Long foundVersion = (Long) actual.getFieldValue("_version_");
assertNotNull(msg, foundVersion);
assertTrue(msg + "... solr doc has non-positive version???",
0 < foundVersion.longValue());
final Integer intVal = (Integer) actual.getFieldValue("val1_i_dvo");
assertNotNull(msg, intVal);
final Long longVal = (Long) actual.getFieldValue("val2_l_dvo");
assertNotNull(msg, longVal);
assertTrue(msg + " ...solr returned older version then model. " +
"should not be possible given the order of operations in writer threads",
Math.abs(expected.version) <= foundVersion.longValue());
if (foundVersion.longValue() == expected.version) {
assertEquals(msg, expected.intFieldValue, intVal.intValue());
assertEquals(msg, expected.longFieldValue, longVal.longValue());
}
// Some things we can assert about any Doc returned from solr,
// even if it's newer then our (expected) model information...
assertTrue(msg + " ...how did a doc in solr get a non positive intVal?",
0 < intVal);
assertTrue(msg + " ...how did a doc in solr get a non positive longVal?",
0 < longVal);
assertEquals(msg + " ...intVal and longVal in solr doc are internally (modulo) inconsistent w/eachother",
0, (longVal % intVal));
// NOTE: when foundVersion is greater then the version read from the model,
// it's not possible to make any assertions about the field values in solr relative to the
// field values in the model -- ie: we can *NOT* assert expected.longFieldVal <= doc.longVal
//
// it's tempting to think that this would be possible if we changed our model to preserve the
// "old" valuess when doing a delete, but that's still no garuntee because of how oportunistic
// concurrency works with negative versions: When adding a doc, we can assert that it must not
// exist with version<0, but we can't assert that the *reason* it doesn't exist was because of
// a delete with the specific version of "-42".
// So a wrtier thread might (1) prep to add a doc for the first time with "intValue=1,_version_=-1",
// and that add may succeed and (2) return some version X which is put in the model. but
// inbetween #1 and #2 other threads may have added & deleted the doc repeatedly, updating
// the model with intValue=7,_version_=-42, and a reader thread might meanwhile read from the
// model before #2 and expect intValue=5, but get intValue=1 from solr (with a greater version)
} else {
fail(String.format(Locale.ENGLISH, "There were more than one result: {}", response));
}
}
} catch (Throwable e) {
operations.set(-1L);
log.error("", e);
throw new RuntimeException(e);
}
}
};
threads.add(thread);
}
// Start all threads
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
{ // final pass over uncommitted model with RTG
for (SolrClient client : clients) {
for (Map.Entry<Integer,DocInfo> entry : model.entrySet()) {
final Integer id = entry.getKey();
final DocInfo expected = entry.getValue();
final SolrDocument actual = client.getById(id.toString());
String msg = "RTG: " + id + "=" + expected;
if (null == actual) {
// a deleted or non-existent document
// sanity check of the model agrees...
assertTrue(msg + " is deleted/non-existent in Solr, but model has non-neg version",
expected.version < 0);
assertEquals(msg + " is deleted/non-existent in Solr", expected.intFieldValue, 0);
assertEquals(msg + " is deleted/non-existent in Solr", expected.longFieldValue, 0);
} else {
msg = msg + " <==VS==> " + actual;
assertEquals(msg, expected.intFieldValue, actual.getFieldValue("val1_i_dvo"));
assertEquals(msg, expected.longFieldValue, actual.getFieldValue("val2_l_dvo"));
assertEquals(msg, expected.version, actual.getFieldValue("_version_"));
assertTrue(msg + " doc exists in solr, but version is negative???",
0 < expected.version);
}
}
}
}
{ // do a final search and compare every result with the model
// because commits don't provide any sort of concrete versioning (or optimistic concurrency constraints)
// there's no way to garuntee that our committedModel matches what was in Solr at the time of the last commit.
// It's possible other threads made additional writes to solr before the commit was processed, but after
// the committedModel variable was assigned it's new value.
//
// what we can do however, is commit all completed updates, and *then* compare solr search results
// against the (new) committed model....
waitForThingsToLevelOut(30); // NOTE: this does an automatic commit for us & ensures replicas are up to date
committedModel = new HashMap<>(model);
// first, prune the model of any docs that have negative versions
// ie: were never actually added, or were ultimately deleted.
for (int i = 0; i < ndocs; i++) {
DocInfo info = committedModel.get(i);
if (info.version < 0) {
// first, a quick sanity check of the model itself...
assertEquals("Inconsistent int value in model for deleted doc" + i + "=" + info,
0, info.intFieldValue);
assertEquals("Inconsistent long value in model for deleted doc" + i + "=" + info,
0L, info.longFieldValue);
committedModel.remove(i);
}
}
for (SolrClient client : clients) {
QueryResponse rsp = client.query(params("q","*:*", "sort", "id asc", "rows", ndocs+""));
for (SolrDocument actual : rsp.getResults()) {
final Integer id = Integer.parseInt(actual.getFieldValue("id").toString());
final DocInfo expected = committedModel.get(id);
assertNotNull("Doc found but missing/deleted from model: " + actual, expected);
final String msg = "Search: " + id + "=" + expected + " <==VS==> " + actual;
assertEquals(msg, expected.intFieldValue, actual.getFieldValue("val1_i_dvo"));
assertEquals(msg, expected.longFieldValue, actual.getFieldValue("val2_l_dvo"));
assertEquals(msg, expected.version, actual.getFieldValue("_version_"));
assertTrue(msg + " doc exists in solr, but version is negative???",
0 < expected.version);
// also sanity check the model (which we already know matches the doc)
assertEquals("Inconsistent (modulo) values in model for id " + id + "=" + expected,
0, (expected.longFieldValue % expected.intFieldValue));
}
assertEquals(committedModel.size(), rsp.getResults().getNumFound());
}
}
}
/**
* Used for storing the info for a document in an in-memory model.
*/
private static class DocInfo {
long version;
int intFieldValue;
long longFieldValue;
public DocInfo(long version, int val1, long val2) {
assert version != 0; // must either be real positive version, or negative deleted version/indicator
this.version = version;
this.intFieldValue = val1;
this.longFieldValue = val2;
}
@Override
public String toString() {
return "[version=" + version + ", intValue=" + intFieldValue + ",longValue=" + longFieldValue + "]";
}
}
@SuppressWarnings("rawtypes")
protected long addDocAndGetVersion(Object... fields) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("versions", "true");
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(params);
ureq.add(doc);
UpdateResponse resp;
// send updates to leader, to avoid SOLR-8733
resp = ureq.process(leaderClient);
long returnedVersion = Long.parseLong(((NamedList) resp.getResponse().get("adds")).getVal(0).toString());
assertTrue("Due to SOLR-8733, sometimes returned version is 0. Let us assert that we have successfully"
+ " worked around that problem here.", returnedVersion > 0);
return returnedVersion;
}
@SuppressWarnings("rawtypes")
protected long deleteDocAndGetVersion(String id, ModifiableSolrParams params, boolean deleteByQuery) throws Exception {
params.add("versions", "true");
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(params);
if (deleteByQuery) {
ureq.deleteByQuery("id:"+id);
} else {
ureq.deleteById(id);
}
UpdateResponse resp;
// send updates to leader, to avoid SOLR-8733
resp = ureq.process(leaderClient);
String key = deleteByQuery? "deleteByQuery": "deletes";
long returnedVersion = Long.parseLong(((NamedList) resp.getResponse().get(key)).getVal(0).toString());
assertTrue("Due to SOLR-8733, sometimes returned version is 0. Let us assert that we have successfully"
+ " worked around that problem here.", returnedVersion < 0);
return returnedVersion;
}
/**
* Method gets the SolrClient for the leader replica. This is needed for a workaround for SOLR-8733.
*/
public SolrClient getClientForLeader() throws KeeperException, InterruptedException {
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
Replica leader = null;
Slice shard1 = clusterState.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1);
leader = shard1.getLeader();
for (int i = 0; i < clients.size(); i++) {
String leaderBaseUrl = zkStateReader.getBaseUrlForNodeName(leader.getNodeName());
if (((HttpSolrClient) clients.get(i)).getBaseURL().startsWith(leaderBaseUrl))
return clients.get(i);
}
return null;
}
}