blob: 0319e072208b7af06dfda9ff8dacea499fe12e0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.singletonMap;
import static java.util.Collections.singletonList;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.SolrTestCaseUtil;
import org.apache.solr.SolrTestUtil;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.security.BasicAuthPlugin;
import org.apache.solr.security.RuleBasedAuthorizationPlugin;
import org.apache.solr.update.processor.DocExpirationUpdateProcessorFactory;
import org.apache.solr.util.TimeOut;
import static org.apache.solr.security.Sha256AuthenticationProvider.getSaltedHashedValue;
import org.junit.After;
import org.junit.Ignore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Test of {@link DocExpirationUpdateProcessorFactory} in a cloud setup */
@Slow // Has to do some sleeping to wait for a future expiration
@LuceneTestCase.Nightly // TODO speedup
public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String COLLECTION = null;
private String USER = null;
private String PASS = null;
@After
public void cleanup() throws Exception {
shutdownCluster();
COLLECTION = null;
USER = null;
PASS = null;
}
/**
* Modifies the request to inlcude authentication params if needed, returns the request
*/
private <T extends SolrRequest> T setAuthIfNeeded(T req) {
if (null != USER) {
assert null != PASS;
req.setBasicAuthCredentials(USER, PASS);
}
return req;
}
public void setupCluster(boolean security) throws Exception {
// we want at most one core per node to force lots of network traffic to try and tickle distributed bugs
final Builder b = configureCluster(4)
.addConfig("conf", SolrTestUtil.TEST_PATH().resolve("configsets").resolve("doc-expiry").resolve("conf"));
COLLECTION = "expiring";
if (security) {
USER = "solr";
PASS = "SolrRocksAgain";
COLLECTION += "_secure";
final String SECURITY_JSON = Utils.toJSONString
(Utils.makeMap("authorization",
Utils.makeMap("class", RuleBasedAuthorizationPlugin.class.getName(),
"user-role", singletonMap(USER,"admin"),
"permissions", singletonList(Utils.makeMap("name","all",
"role","admin"))),
"authentication",
Utils.makeMap("class", BasicAuthPlugin.class.getName(),
"blockUnknown",true,
"credentials", singletonMap(USER, getSaltedHashedValue(PASS)))));
b.withSecurityJson(SECURITY_JSON);
}
b.configure();
setAuthIfNeeded(CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2))
.process(cluster.getSolrClient());
cluster.getSolrClient().waitForState(COLLECTION, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT_UNIT,
(n, c) -> DocCollection.isFullyActive(n, c, 2, 2));
}
public void testNoAuth() throws Exception {
setupCluster(false);
runTest();
}
@Ignore // MRM TODO:
public void testBasicAuth() throws Exception {
setupCluster(true);
// sanity check that our cluster really does require authentication
assertEquals("sanity check of non authenticated request",
401, SolrTestCaseUtil.expectThrows(SolrException.class, () -> {
final long ignored = cluster.getSolrClient().query(COLLECTION, params("q", "*:*", "rows", "0", "_trace", "no_auth_sanity_check")).getResults().getNumFound();
}).code());
runTest();
}
private void runTest() throws Exception {
final int totalNumDocs = SolrTestUtil.atLeast(50);
// Add a bunch of docs; some with extremely short expiration, some with no expiration
// these should be randomly distributed to each shard
long numDocsThatNeverExpire = 0;
{
final UpdateRequest req = setAuthIfNeeded(new UpdateRequest());
for (int i = 1; i <= totalNumDocs; i++) {
final SolrInputDocument doc = SolrTestCaseJ4.sdoc("id", i);
if (random().nextBoolean()) {
doc.addField("should_expire_s","yup");
doc.addField("tTl_s","+1SECONDS");
} else {
numDocsThatNeverExpire++;
}
req.add(doc);
}
req.commit(cluster.getSolrClient(), COLLECTION);
}
// NOTE: don't assume we can find exactly totalNumDocs right now, some may have already been deleted...
// it should not take long for us to get to the point where all 'should_expire_s:yup' docs are gone
waitForNoResults(10, params("q","should_expire_s:yup","rows","0","_trace","init_batch_check"));
{
// ...*NOW* we can assert that exactly numDocsThatNeverExpire should exist...
final QueryRequest req = setAuthIfNeeded(new QueryRequest
(params("q", "*:*",
"rows", "0",
"_trace", "count_non_expire_docs")));
// NOTE: it's possible that replicas could be out of sync but this query may get lucky and
// only hit leaders. we'll compare the counts of every replica in every shard later on...
assertEquals(numDocsThatNeverExpire,
req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound());
}
//
// now that we've confrmed the basics work, let's check some fine grain stuff...
//
// first off, sanity check that this special docId doesn't some how already exist
waitForNoResults(0, params("q","id:special99","rows","0","_trace","sanity_check99"));
{
// force a hard commit on all shards (the prior auto-expire would have only done a soft commit)
// so we can ensure our indexVersion won't change uncessisarily on the un-affected
// shard when we add & (hard) commit our special doc...
final UpdateRequest req = setAuthIfNeeded(new UpdateRequest());
req.commit(cluster.getSolrClient(), COLLECTION);
}
// record important data for each replica core so we can check later
// that it only changes for the replicas of a single shard after we add/expire a single special doc
log.info("Fetching ReplicaData BEFORE special doc addition/expiration");
final Map<String,ReplicaData> initReplicaData = getTestDataForAllReplicas();
assertTrue("WTF? no replica data?", 0 < initReplicaData.size());
// add & hard commit a special doc with a short TTL
setAuthIfNeeded(new UpdateRequest()).add(SolrTestCaseJ4.sdoc("id", "special99", "should_expire_s","yup","tTl_s","+30SECONDS"))
.commit(cluster.getSolrClient(), COLLECTION);
// wait for our special docId to be deleted
waitForNoResults(180, params("q","id:special99","rows","0","_trace","did_special_doc_expire_yet"));
// now check all of the replicas to verify a few things:
// - only the replicas of one shard changed -- no unneccessary churn on other shards
// - every replica of each single shard should have the same number of docs
// - the total number of docs should match numDocsThatNeverExpire
log.info("Fetching ReplicaData AFTER special doc addition/expiration");
final Map<String,ReplicaData> finalReplicaData = getTestDataForAllReplicas();
assertEquals("WTF? not same num replicas?",
initReplicaData.size(),
finalReplicaData.size());
final Set<String> coresThatChange = new HashSet<>();
final Set<String> shardsThatChange = new HashSet<>();
int coresCompared = 0;
int totalDocsOnAllShards = 0;
final DocCollection collectionState = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
for (Slice shard : collectionState) {
boolean firstReplica = true;
for (Replica replica : shard) {
coresCompared++;
assertEquals(shard.getName(), replica.getSlice()); // sanity check
final String core = replica.getName();
final ReplicaData initData = initReplicaData.get(core);
final ReplicaData finalData = finalReplicaData.get(core);
assertNotNull(shard.getName() + ": no init data for core: " + core, initData);
assertNotNull(shard.getName() + ": no final data for core: " + core, finalData);
if (!initData.equals(finalData)) {
log.error("ReplicaData changed: {} != {}", initData, finalData);
coresThatChange.add(core + "("+shard.getName()+")");
shardsThatChange.add(shard.getName());
}
if (firstReplica) {
totalDocsOnAllShards += finalData.numDocs;
firstReplica = false;
}
}
}
assertEquals("Exactly one shard should have changed, instead: " + shardsThatChange
+ " cores=(" + coresThatChange + ")",
1, shardsThatChange.size());
assertEquals("somehow we missed some cores?",
initReplicaData.size(), coresCompared);
assertEquals("Final tally has incorrect numDocsThatNeverExpire",
numDocsThatNeverExpire, totalDocsOnAllShards);
// TODO: above logic verifies that deleteByQuery happens on all nodes, and ...
// doesn't affect searcher re-open on shards w/o expired docs ... can we also verify
// that *only* one node is sending the deletes ?
// (ie: no flood of redundant deletes?)
}
/**
* returns a map whose key is the coreNodeName and whose value is data about that core needed for the test
*/
private Map<String,ReplicaData> getTestDataForAllReplicas() throws IOException, SolrServerException {
Map<String,ReplicaData> results = new HashMap<>();
DocCollection collectionState = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
for (Replica replica : collectionState.getReplicas()) {
String coreName = replica.getName();
try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(replica.getCoreUrl())) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("command", "indexversion");
params.set("_trace", "getIndexVersion");
params.set("qt", ReplicationHandler.PATH);
QueryRequest req = setAuthIfNeeded(new QueryRequest(params));
NamedList<Object> res = client.request(req);
assertNotNull("null response from server: " + coreName, res);
Object version = res.get("indexversion");
assertNotNull("null version from server: " + coreName, version);
assertTrue("version isn't a long: " + coreName, version instanceof Long);
long numDocs =
setAuthIfNeeded(new QueryRequest
(params("q", "*:*",
"distrib", "false",
"rows", "0",
"_trace", "counting_docs"))).process(client).getResults().getNumFound();
final ReplicaData data = new ReplicaData(replica.getSlice(),coreName,(Long)version,numDocs);
log.info("{}", data);
results.put(coreName, data);
}
}
return results;
}
/**
* Executes a query over and over against the cloudClient every 5 seconds
* until the numFound is 0 or the maxTimeLimitSeconds is exceeded.
* Query is guaranteed to be executed at least once.
*/
private void waitForNoResults(int maxTimeLimitSeconds,
SolrParams params)
throws SolrServerException, InterruptedException, IOException {
final QueryRequest req = setAuthIfNeeded(new QueryRequest(params));
final TimeOut timeout = new TimeOut(maxTimeLimitSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME);
long numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound();
while (0L < numFound && ! timeout.hasTimedOut()) {
Thread.sleep(Math.max(1, Math.min(1500, timeout.timeLeft(TimeUnit.MILLISECONDS))));
numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound();
}
assertEquals("Give up waiting for no results: " + params,
0L, numFound);
}
private static class ReplicaData {
public final String shardName;
public final String coreName;
public final long indexVersion;
public final long numDocs;
public ReplicaData(final String shardName,
final String coreName,
final long indexVersion,
final long numDocs) {
assert null != shardName;
assert null != coreName;
this.shardName = shardName;
this.coreName = coreName;
this.indexVersion = indexVersion;
this.numDocs = numDocs;
}
@Override
public String toString() {
return "ReplicaData(shard="+shardName+",core="+coreName+
",indexVer="+indexVersion+",numDocs="+numDocs+")";
}
@Override
public boolean equals(Object other) {
if (other instanceof ReplicaData) {
ReplicaData that = (ReplicaData)other;
return
this.shardName.equals(that.shardName) &&
this.coreName.equals(that.coreName) &&
(this.indexVersion == that.indexVersion) &&
(this.numDocs == that.numDocs);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(this.shardName, this.coreName, this.indexVersion, this.numDocs);
}
}
}