blob: 143f1f4dee85da44f2cd3681968cd5d24ea036d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.update.processor.DocExpirationUpdateProcessorFactory;
import org.apache.solr.util.SecurityJson;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Test of {@link DocExpirationUpdateProcessorFactory} in a cloud setup */
@LuceneTestCase.Nightly // Has to do some sleeping to wait for a future expiration
public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String COLLECTION = null;
private String USER = null;
private String PASS = null;
@After
public void cleanup() throws Exception {
shutdownCluster();
COLLECTION = null;
USER = null;
PASS = null;
}
/** Modifies the request to inlcude authentication params if needed, returns the request */
private <T extends SolrRequest<?>> T setAuthIfNeeded(T req) {
if (null != USER) {
assertNotNull(PASS);
req.setBasicAuthCredentials(USER, PASS);
}
return req;
}
public void setupCluster(boolean security) throws Exception {
// we want at most one core per node to force lots of network traffic to try and tickle
// distributed bugs
final MiniSolrCloudCluster.Builder b =
configureCluster(4)
.addConfig(
"conf", TEST_PATH().resolve("configsets").resolve("doc-expiry").resolve("conf"));
COLLECTION = "expiring";
if (security) {
USER = SecurityJson.USER;
PASS = SecurityJson.PASS;
COLLECTION += "_secure";
b.withSecurityJson(SecurityJson.SIMPLE);
}
b.configure();
setAuthIfNeeded(CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2))
.process(cluster.getSolrClient());
cluster
.getZkStateReader()
.waitForState(
COLLECTION,
DEFAULT_TIMEOUT,
TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 2, 2));
}
@Test
public void testNoAuth() throws Exception {
setupCluster(false);
runTest();
}
@Test
public void testBasicAuth() throws Exception {
setupCluster(true);
// check that our cluster really does require authentication
assertEquals(
"check of unauthenticated request",
401,
expectThrows(
SolrException.class,
() -> {
final long ignored =
cluster
.getSolrClient()
.query(
COLLECTION,
params(
"q", "*:*",
"rows", "0",
"_trace", "no_auth_check"))
.getResults()
.getNumFound();
})
.code());
runTest();
}
private void runTest() throws Exception {
final int totalNumDocs = atLeast(50);
// Add a bunch of docs; some with extremely short expiration, some with no expiration
// these should be randomly distributed to each shard
long numDocsThatNeverExpire = 0;
{
final UpdateRequest req = setAuthIfNeeded(new UpdateRequest());
for (int i = 1; i <= totalNumDocs; i++) {
final SolrInputDocument doc = sdoc("id", i);
if (random().nextBoolean()) {
doc.addField("should_expire_s", "yup");
doc.addField("tTl_s", "+1SECONDS");
} else {
numDocsThatNeverExpire++;
}
req.add(doc);
}
req.commit(cluster.getSolrClient(), COLLECTION);
}
// NOTE: don't assume we can find exactly totalNumDocs right now, some may have already been
// deleted...
// it should not take long for us to get to the point where all 'should_expire_s:yup' docs are
// gone
waitForNoResults(
30, params("q", "should_expire_s:yup", "rows", "0", "_trace", "init_batch_check"));
{
// ...*NOW* we can assert that exactly numDocsThatNeverExpire should exist...
final QueryRequest req =
setAuthIfNeeded(
new QueryRequest(
params(
"q", "*:*",
"rows", "0",
"_trace", "count_non_expire_docs")));
// NOTE: it's possible that replicas could be out of sync but this query may get lucky and
// only hit leaders. we'll compare the counts of every replica in every shard later on...
assertEquals(
numDocsThatNeverExpire,
req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound());
}
// now that we've confirmed the basics work, let's check some fine grain stuff...
// first off, check that this special docId doesn't somehow already exist
waitForNoResults(0, params("q", "id:special99", "rows", "0", "_trace", "sanity_check99"));
{
// force a hard commit on all shards (the prior auto-expire would have only done a soft
// commit) so we can ensure our indexVersion won't change unnecessarily on the un-affected
// shard when we add & (hard) commit our special doc...
final UpdateRequest req = setAuthIfNeeded(new UpdateRequest());
req.commit(cluster.getSolrClient(), COLLECTION);
}
// record important data for each replica core, so we can check later that it only changes for
// the replicas of a single shard after we add/expire a single special doc
log.info("Fetching ReplicaData BEFORE special doc addition/expiration");
final Map<String, ReplicaData> initReplicaData = getTestDataForAllReplicas();
assertTrue("WTF? no replica data?", 0 < initReplicaData.size());
// add & hard commit a special doc with a short TTL
setAuthIfNeeded(new UpdateRequest())
.add(sdoc("id", "special99", "should_expire_s", "yup", "tTl_s", "+30SECONDS"))
.commit(cluster.getSolrClient(), COLLECTION);
// wait for our special docId to be deleted
waitForNoResults(
180, params("q", "id:special99", "rows", "0", "_trace", "did_special_doc_expire_yet"));
// now check all the replicas to verify a few things:
// - only the replicas of one shard changed -- no unnecessary churn on other shards
// - every replica of each single shard should have the same number of docs
// - the total number of docs should match numDocsThatNeverExpire
log.info("Fetching ReplicaData AFTER special doc addition/expiration");
final Map<String, ReplicaData> finalReplicaData = getTestDataForAllReplicas();
assertEquals("WTF? not same num replicas?", initReplicaData.size(), finalReplicaData.size());
final Set<String> coresThatChange = new HashSet<>();
final Set<String> shardsThatChange = new HashSet<>();
int coresCompared = 0;
long totalDocsOnAllShards = 0;
final DocCollection collectionState =
cluster.getSolrClient().getClusterState().getCollection(COLLECTION);
for (Slice shard : collectionState) {
boolean firstReplica = true;
for (Replica replica : shard) {
coresCompared++;
assertEquals(shard.getName(), replica.getShard()); // sanity check
final String core = replica.getCoreName();
final ReplicaData initData = initReplicaData.get(core);
final ReplicaData finalData = finalReplicaData.get(core);
assertNotNull(shard.getName() + ": no init data for core: " + core, initData);
assertNotNull(shard.getName() + ": no final data for core: " + core, finalData);
if (!initData.equals(finalData)) {
log.error("ReplicaData changed: {} != {}", initData, finalData);
coresThatChange.add(core + "(" + shard.getName() + ")");
shardsThatChange.add(shard.getName());
}
if (firstReplica) {
totalDocsOnAllShards += finalData.numDocs;
firstReplica = false;
}
}
}
assertEquals(
"Exactly one shard should have changed, instead: "
+ shardsThatChange
+ " cores=("
+ coresThatChange
+ ")",
1,
shardsThatChange.size());
assertEquals("somehow we missed some cores?", initReplicaData.size(), coresCompared);
assertEquals(
"Final tally has incorrect numDocsThatNeverExpire",
numDocsThatNeverExpire,
totalDocsOnAllShards);
// TODO: above logic verifies that deleteByQuery happens on all nodes, and ...
// doesn't affect searcher re-open on shards w/o expired docs ... can we also verify
// that *only* one node is sending the deletes ?
// (ie: no flood of redundant deletes?)
}
/**
* returns a map whose key is the coreNodeName and whose value is data about that core needed for
* the test
*/
private Map<String, ReplicaData> getTestDataForAllReplicas()
throws IOException, SolrServerException {
Map<String, ReplicaData> results = new HashMap<>();
DocCollection collectionState =
cluster.getSolrClient().getClusterState().getCollection(COLLECTION);
for (Replica replica : collectionState.getReplicas()) {
String coreName = replica.getCoreName();
try (SolrClient client = getHttpSolrClient(replica)) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("command", "indexversion");
params.set("_trace", "getIndexVersion");
params.set("qt", ReplicationHandler.PATH);
QueryRequest req = setAuthIfNeeded(new QueryRequest(params));
NamedList<Object> res = client.request(req);
assertNotNull("null response from server: " + coreName, res);
Object version = res.get("indexversion");
assertNotNull("null version from server: " + coreName, version);
assertTrue("version isn't a long: " + coreName, version instanceof Long);
long numDocs =
setAuthIfNeeded(
new QueryRequest(
params(
"q", "*:*",
"distrib", "false",
"rows", "0",
"_trace", "counting_docs")))
.process(client)
.getResults()
.getNumFound();
final ReplicaData data =
new ReplicaData(replica.getShard(), coreName, (Long) version, numDocs);
log.info("{}", data);
results.put(coreName, data);
}
}
return results;
}
/**
* Executes a query over and over against the cloudClient every 5 seconds until the numFound is 0
* or the maxTimeLimitSeconds is exceeded. Query is guaranteed to be executed at least once.
*/
private void waitForNoResults(int maxTimeLimitSeconds, SolrParams params)
throws SolrServerException, InterruptedException, IOException {
final QueryRequest req = setAuthIfNeeded(new QueryRequest(params));
final TimeOut timeout =
new TimeOut(maxTimeLimitSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);
long numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound();
while (0L < numFound && !timeout.hasTimedOut()) {
Thread.sleep(Math.max(1, Math.min(5000, timeout.timeLeft(TimeUnit.MILLISECONDS))));
numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound();
}
assertEquals("Give up waiting for no results: " + params, 0L, numFound);
}
private static class ReplicaData {
public final String shardName;
public final String coreName;
public final long indexVersion;
public final long numDocs;
public ReplicaData(
final String shardName,
final String coreName,
final long indexVersion,
final long numDocs) {
assertNotNull(shardName);
assertNotNull(coreName);
this.shardName = shardName;
this.coreName = coreName;
this.indexVersion = indexVersion;
this.numDocs = numDocs;
}
@Override
public String toString() {
return "ReplicaData(shard="
+ shardName
+ ",core="
+ coreName
+ ",indexVer="
+ indexVersion
+ ",numDocs="
+ numDocs
+ ")";
}
@Override
public boolean equals(Object other) {
if (other instanceof ReplicaData) {
ReplicaData that = (ReplicaData) other;
return this.shardName.equals(that.shardName)
&& this.coreName.equals(that.coreName)
&& (this.indexVersion == that.indexVersion)
&& (this.numDocs == that.numDocs);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(this.shardName, this.coreName, this.indexVersion, this.numDocs);
}
}
}