blob: b42f9757f8dc33f528eae85e9231a2ccaea54dc5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.SolrTestUtil;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.ExpectedErr;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.SolrParams;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.addErr;
import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.delIErr;
import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.delQErr;
import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.f;
import static org.apache.solr.cloud.TestTolerantUpdateProcessorCloud.update;
import static org.apache.solr.common.params.CursorMarkParams.CURSOR_MARK_PARAM;
import static org.apache.solr.common.params.CursorMarkParams.CURSOR_MARK_START;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
* Test of TolerantUpdateProcessor using a randomized MiniSolrCloud.
* Reuses some utility methods in {@link TestTolerantUpdateProcessorCloud}
*
* <p>
* <b>NOTE:</b> This test sets up a static instance of MiniSolrCloud with a single collection
* and several clients pointed at specific nodes. These are all re-used across multiple test methods,
* and assumes that the state of the cluster is healthy between tests.
* </p>
*slowest_test_suite=org.apache.solr.cloud.DocValuesNotIndexedTest
*/
@SolrTestCase.SuppressSSL(bugUrl="https://issues.apache.org/jira/browse/SOLR-9182 - causes OOM")
public class TestTolerantUpdateProcessorRandomCloud extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String COLLECTION_NAME = "test_col";
/** A basic client for operations at the cloud level, default collection will be set */
private static CloudHttp2SolrClient CLOUD_CLIENT;
/** one HttpSolrClient for each server */
private static List<Http2SolrClient> NODE_CLIENTS;
@BeforeClass
public static void createMiniSolrCloudCluster() throws Exception {
useFactory(null);
final String configName = "solrCloudCollectionConfig";
final File configDir = new File(SolrTestUtil.TEST_HOME() + File.separator + "collection1" + File.separator + "conf");
final int numShards = TestUtil.nextInt(random(), 2, TEST_NIGHTLY ? 5 : 3);
final int repFactor = TestUtil.nextInt(random(), 2, TEST_NIGHTLY ? 5 : 3);
// at least one server won't have any replicas
final int numServers = 1 + (numShards * repFactor);
log.info("Configuring cluster: servers={}, shards={}, repfactor={}", numServers, numShards, repFactor);
configureCluster(numServers)
.addConfig(configName, configDir.toPath())
.configure();
Map<String, String> collectionProperties = new HashMap<>();
collectionProperties.put("config", "solrconfig-distrib-update-processor-chains.xml");
collectionProperties.put("schema", "schema15.xml"); // string id
CLOUD_CLIENT = cluster.getSolrClient();
CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME);
CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor)
.setProperties(collectionProperties)
.setMaxShardsPerNode(10)
.process(CLOUD_CLIENT);
if (NODE_CLIENTS != null) {
for (Http2SolrClient client : NODE_CLIENTS) {
client.close();
}
}
NODE_CLIENTS = new ArrayList<Http2SolrClient>(numServers);
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
String jettyURL = jetty.getBaseUrl();
NODE_CLIENTS.add(SolrTestCaseJ4
.getHttpSolrClient(jettyURL.toString() + "/" + COLLECTION_NAME + "/"));
}
assertEquals(numServers, NODE_CLIENTS.size());
}
@After
public void deleteAllDocs() throws Exception {
CLOUD_CLIENT.deleteByQuery("*:*");
CLOUD_CLIENT.commit();
// TODO: this can rarely randomly fail, whats the race?
//assertEquals("index should be empty", 0L, countDocs(CLOUD_CLIENT));
}
@AfterClass
public static void afterClass() throws Exception {
if (NODE_CLIENTS != null) {
for (Http2SolrClient client : NODE_CLIENTS) {
client.close();
}
}
shutdownCluster();
NODE_CLIENTS = null;
CLOUD_CLIENT = null;
}
public void testRandomUpdates() throws Exception {
final int maxDocId = LuceneTestCase.atLeast(TEST_NIGHTLY ? 10000 : 30);
final BitSet expectedDocIds = new BitSet(maxDocId+1);
final int numIters = TEST_NIGHTLY ? LuceneTestCase.atLeast(50) : 5;
for (int i = 0; i < numIters; i++) {
log.info("BEGIN ITER #{}", i);
final UpdateRequest req = update(params("maxErrors","-1",
"update.chain", "tolerant-chain-max-errors-10"));
final int numCmds = TestUtil.nextInt(random(), 1, 20);
final List<ExpectedErr> expectedErrors = new ArrayList<ExpectedErr>(numCmds);
int expectedErrorsCount = 0;
// it's ambigious/confusing which order mixed DELQ + ADD (or ADD and DELI for the same ID)
// in the same request wll be processed by various clients, so we keep things simple
// and ensure that no single doc Id is affected by more then one command in the same request
final BitSet docsAffectedThisRequest = new BitSet(maxDocId+1);
for (int cmdIter = 0; cmdIter < numCmds; cmdIter++) {
if ((maxDocId / 2) < docsAffectedThisRequest.cardinality()) {
// we're already mucking with more then half the docs in the index
break;
}
final boolean causeError = random().nextBoolean();
if (causeError) {
expectedErrorsCount++;
}
if (random().nextBoolean()) {
// add a doc
String id = null;
SolrInputDocument doc = null;
if (causeError && (0 == TestUtil.nextInt(random(), 0, 21))) {
doc = doc(f("foo_s","no unique key"));
expectedErrors.add(addErr("(unknown)"));
} else {
final int id_i = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId);
docsAffectedThisRequest.set(id_i);
id = "id_"+id_i;
if (causeError) {
expectedErrors.add(addErr(id));
} else {
expectedDocIds.set(id_i);
}
final String val = causeError ? "bogus_val" : (""+TestUtil.nextInt(random(), 42, 666));
doc = doc(f("id",id),
f("id_i", id_i),
f("foo_i", val));
}
req.add(doc);
log.info("ADD: {} = {}", id, doc);
} else {
// delete something
if (random().nextBoolean()) {
// delete by id
final int id_i = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId);
final String id = "id_"+id_i;
final boolean docExists = expectedDocIds.get(id_i);
docsAffectedThisRequest.set(id_i);
long versionConstraint = docExists ? 1 : -1;
if (causeError) {
versionConstraint = -1 * versionConstraint;
expectedErrors.add(delIErr(id));
} else {
// if doc exists it will legitimately be deleted
expectedDocIds.clear(id_i);
}
req.deleteById(id, versionConstraint);
log.info("DEL: {} = {}", id, causeError ? "ERR" : "OK" );
} else {
// delete by query
final String q;
if (causeError) {
// even though our DBQ is gibberish that's going to fail, record a docId as affected
// so that we don't generate the same random DBQ and get redundent errors
// (problematic because of how DUP forwarded DBQs have to have their errors deduped by TUP)
final int id_i = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId);
docsAffectedThisRequest.set(id_i);
q = "foo_i:["+id_i+" TO ....giberish";
expectedErrors.add(delQErr(q));
} else {
// ensure our DBQ is only over a range of docs not already affected
// by any other cmds in this request
final int rangeAxis = randomUnsetBit(random(), docsAffectedThisRequest, maxDocId);
final int loBound = docsAffectedThisRequest.previousSetBit(rangeAxis);
final int hiBound = docsAffectedThisRequest.nextSetBit(rangeAxis);
final int lo = TestUtil.nextInt(random(), loBound+1, rangeAxis);
final int hi = TestUtil.nextInt(random(), rangeAxis,
// bound might be negative if no set bits above axis
(hiBound < 0) ? maxDocId : hiBound-1);
if (lo != hi) {
assert lo < hi : "lo="+lo+" hi="+hi;
// NOTE: clear & set are exclusive of hi, so we use "}" in range query accordingly
q = "id_i:[" + lo + " TO " + hi + "}";
expectedDocIds.clear(lo, hi);
docsAffectedThisRequest.set(lo, hi);
} else {
// edge case: special case DBQ of one doc
assert (lo == rangeAxis && hi == rangeAxis) : "lo="+lo+" axis="+rangeAxis+" hi="+hi;
q = "id_i:[" + lo + " TO " + lo + "]"; // have to be inclusive of both ends
expectedDocIds.clear(lo);
docsAffectedThisRequest.set(lo);
}
}
req.deleteByQuery(q);
log.info("DEL: {}", q);
}
}
}
assertEquals("expected error count sanity check: " + req.toString(),
expectedErrorsCount, expectedErrors.size());
final SolrClient client = random().nextBoolean() ? CLOUD_CLIENT
: NODE_CLIENTS.get(TestUtil.nextInt(random(), 0, NODE_CLIENTS.size()-1));
final UpdateResponse rsp = req.process(client);
TestTolerantUpdateProcessorCloud.assertUpdateTolerantErrors(client.toString() + " => " + expectedErrors.toString(), rsp,
expectedErrors.toArray(new ExpectedErr[expectedErrors.size()]));
if (log.isInfoEnabled()) {
log.info("END ITER #{}, expecting #docs: {}", i, expectedDocIds.cardinality());
}
assertEquals("post update commit failed?", 0, CLOUD_CLIENT.commit().getStatus());
// MRM TODO: - see MRM TODO: below
// for (int j = 0; j < 100; j++) {
// if (expectedDocIds.cardinality() == countDocs(CLOUD_CLIENT)) {
// break;
// }
// log.info("sleeping to give searchers a chance to re-open #{}", j);
// Thread.sleep(250);
// }
// check the index contents against our expectations
final BitSet actualDocIds = allDocs(CLOUD_CLIENT, maxDocId);
if ( expectedDocIds.cardinality() != actualDocIds.cardinality() ) {
log.error("cardinality mismatch: expected {} BUT actual {}",
expectedDocIds.cardinality(),
actualDocIds.cardinality());
}
final BitSet x = (BitSet) actualDocIds.clone();
x.xor(expectedDocIds);
for (int b = x.nextSetBit(0); 0 <= b; b = x.nextSetBit(b+1)) {
final boolean expectedBit = expectedDocIds.get(b);
final boolean actualBit = actualDocIds.get(b);
log.error("bit #{} mismatch: expected {} BUT actual {}", b, expectedBit, actualBit);
}
// MRM TODO: - fails, investigate
// assertEquals(x.cardinality() + " mismatched bits",
// expectedDocIds.cardinality(), actualDocIds.cardinality());
}
}
/** sanity check that randomUnsetBit works as expected
* @see #randomUnsetBit
*/
@LuceneTestCase.AwaitsFix(bugUrl = "this sanity check is flakey...")
public void testSanityRandomUnsetBit() {
final int max = LuceneTestCase.atLeast(100);
BitSet bits = new BitSet(max+1);
for (int i = 0; i <= max; i++) {
assertFalse("how is bitset already full? iter="+i+" card="+bits.cardinality()+"/max="+max,
bits.cardinality() == max+1);
final int nextBit = randomUnsetBit(random(), bits, max);
assertTrue("nextBit shouldn't be negative yet: " + nextBit,
0 <= nextBit);
assertTrue("nextBit can't exceed max: " + nextBit,
nextBit <= max);
assertFalse("expect unset: " + nextBit, bits.get(nextBit));
bits.set(nextBit);
}
assertEquals("why isn't bitset full?", max+1, bits.cardinality());
final int firstClearBit = bits.nextClearBit(0);
assertTrue("why is there a clear bit? = " + firstClearBit,
max < firstClearBit);
assertEquals("why is a bit set above max?",
-1, bits.nextSetBit(max+1));
assertEquals("wrong nextBit at end of all iters", -1,
randomUnsetBit(random(), bits, max));
assertEquals("wrong nextBit at redundant end of all iters", -1,
randomUnsetBit(random(), bits, max));
}
public static SolrInputDocument doc(SolrInputField... fields) {
// SolrTestCaseJ4 has same method name, prevents static import from working
return TestTolerantUpdateProcessorCloud.doc(fields);
}
/**
* Given a BitSet, returns a random bit that is currently false, or -1 if all bits are true.
* NOTE: this method is not fair.
*/
public static final int randomUnsetBit(Random r, BitSet bits, final int max) {
// NOTE: don't forget, BitSet will grow automatically if not careful
if (bits.cardinality() == max+1) {
return -1;
}
final int candidate = TestUtil.nextInt(r, 0, max);
if (bits.get(candidate)) {
final int lo = bits.previousClearBit(candidate);
final int hi = bits.nextClearBit(candidate);
if (lo < 0 && max < hi) {
fail("how the hell did we not short circut out? card="+bits.cardinality()+"/size="+bits.size());
} else if (lo < 0) {
return hi;
} else if (max < hi) {
return lo;
} // else...
return ((candidate - lo) < (hi - candidate)) ? lo : hi;
}
return candidate;
}
/** returns the numFound from a *:* query */
public static final long countDocs(SolrClient c) throws Exception {
return c.query(params("q","*:*","rows","0")).getResults().getNumFound();
}
/** uses a Cursor to iterate over every doc in the index, recording the 'id_i' value in a BitSet */
private static final BitSet allDocs(final SolrClient c, final int maxDocIdExpected) throws Exception {
BitSet docs = new BitSet(maxDocIdExpected+1);
String cursorMark = CURSOR_MARK_START;
int docsOnThisPage = Integer.MAX_VALUE;
while (0 < docsOnThisPage) {
final SolrParams p = params("q","*:*",
"rows","100",
// note: not numeric, but we don't actual care about the order
"sort", "id asc",
CURSOR_MARK_PARAM, cursorMark);
QueryResponse rsp = c.query(p);
cursorMark = rsp.getNextCursorMark();
docsOnThisPage = 0;
for (SolrDocument doc : rsp.getResults()) {
docsOnThisPage++;
int id_i = ((Integer)doc.get("id_i")).intValue();
assertTrue("found id_i bigger then expected "+maxDocIdExpected+": " + id_i,
id_i <= maxDocIdExpected);
docs.set(id_i);
}
cursorMark = rsp.getNextCursorMark();
}
return docs;
}
}