| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.cloud; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.lucene.tests.util.LuceneTestCase; |
| import org.apache.solr.client.solrj.SolrClient; |
| import org.apache.solr.client.solrj.SolrRequest; |
| import org.apache.solr.client.solrj.SolrServerException; |
| import org.apache.solr.client.solrj.request.CollectionAdminRequest; |
| import org.apache.solr.client.solrj.request.QueryRequest; |
| import org.apache.solr.client.solrj.request.UpdateRequest; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.Slice; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.solr.common.params.SolrParams; |
| import org.apache.solr.common.util.NamedList; |
| import org.apache.solr.common.util.TimeSource; |
| import org.apache.solr.handler.ReplicationHandler; |
| import org.apache.solr.update.processor.DocExpirationUpdateProcessorFactory; |
| import org.apache.solr.util.SecurityJson; |
| import org.apache.solr.util.TimeOut; |
| import org.junit.After; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** Test of {@link DocExpirationUpdateProcessorFactory} in a cloud setup */ |
| @LuceneTestCase.Nightly // Has to do some sleeping to wait for a future expiration |
| public class DistribDocExpirationUpdateProcessorTest extends SolrCloudTestCase { |
| |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| private String COLLECTION = null; |
| private String USER = null; |
| private String PASS = null; |
| |
| @After |
| public void cleanup() throws Exception { |
| shutdownCluster(); |
| COLLECTION = null; |
| USER = null; |
| PASS = null; |
| } |
| |
| /** Modifies the request to inlcude authentication params if needed, returns the request */ |
| private <T extends SolrRequest<?>> T setAuthIfNeeded(T req) { |
| if (null != USER) { |
| assertNotNull(PASS); |
| req.setBasicAuthCredentials(USER, PASS); |
| } |
| return req; |
| } |
| |
| public void setupCluster(boolean security) throws Exception { |
| // we want at most one core per node to force lots of network traffic to try and tickle |
| // distributed bugs |
| final MiniSolrCloudCluster.Builder b = |
| configureCluster(4) |
| .addConfig( |
| "conf", TEST_PATH().resolve("configsets").resolve("doc-expiry").resolve("conf")); |
| |
| COLLECTION = "expiring"; |
| if (security) { |
| USER = SecurityJson.USER; |
| PASS = SecurityJson.PASS; |
| COLLECTION += "_secure"; |
| |
| b.withSecurityJson(SecurityJson.SIMPLE); |
| } |
| b.configure(); |
| |
| setAuthIfNeeded(CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 2)) |
| .process(cluster.getSolrClient()); |
| |
| cluster |
| .getZkStateReader() |
| .waitForState( |
| COLLECTION, |
| DEFAULT_TIMEOUT, |
| TimeUnit.SECONDS, |
| (n, c) -> DocCollection.isFullyActive(n, c, 2, 2)); |
| } |
| |
| @Test |
| public void testNoAuth() throws Exception { |
| setupCluster(false); |
| runTest(); |
| } |
| |
| @Test |
| public void testBasicAuth() throws Exception { |
| setupCluster(true); |
| |
| // check that our cluster really does require authentication |
| assertEquals( |
| "check of unauthenticated request", |
| 401, |
| expectThrows( |
| SolrException.class, |
| () -> { |
| final long ignored = |
| cluster |
| .getSolrClient() |
| .query( |
| COLLECTION, |
| params( |
| "q", "*:*", |
| "rows", "0", |
| "_trace", "no_auth_check")) |
| .getResults() |
| .getNumFound(); |
| }) |
| .code()); |
| |
| runTest(); |
| } |
| |
| private void runTest() throws Exception { |
| final int totalNumDocs = atLeast(50); |
| |
| // Add a bunch of docs; some with extremely short expiration, some with no expiration |
| // these should be randomly distributed to each shard |
| long numDocsThatNeverExpire = 0; |
| { |
| final UpdateRequest req = setAuthIfNeeded(new UpdateRequest()); |
| for (int i = 1; i <= totalNumDocs; i++) { |
| final SolrInputDocument doc = sdoc("id", i); |
| |
| if (random().nextBoolean()) { |
| doc.addField("should_expire_s", "yup"); |
| doc.addField("tTl_s", "+1SECONDS"); |
| } else { |
| numDocsThatNeverExpire++; |
| } |
| |
| req.add(doc); |
| } |
| req.commit(cluster.getSolrClient(), COLLECTION); |
| } |
| |
| // NOTE: don't assume we can find exactly totalNumDocs right now, some may have already been |
| // deleted... |
| |
| // it should not take long for us to get to the point where all 'should_expire_s:yup' docs are |
| // gone |
| waitForNoResults( |
| 30, params("q", "should_expire_s:yup", "rows", "0", "_trace", "init_batch_check")); |
| |
| { |
| // ...*NOW* we can assert that exactly numDocsThatNeverExpire should exist... |
| final QueryRequest req = |
| setAuthIfNeeded( |
| new QueryRequest( |
| params( |
| "q", "*:*", |
| "rows", "0", |
| "_trace", "count_non_expire_docs"))); |
| |
| // NOTE: it's possible that replicas could be out of sync but this query may get lucky and |
| // only hit leaders. we'll compare the counts of every replica in every shard later on... |
| assertEquals( |
| numDocsThatNeverExpire, |
| req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound()); |
| } |
| |
| // now that we've confirmed the basics work, let's check some fine grain stuff... |
| |
| // first off, check that this special docId doesn't somehow already exist |
| waitForNoResults(0, params("q", "id:special99", "rows", "0", "_trace", "sanity_check99")); |
| |
| { |
| // force a hard commit on all shards (the prior auto-expire would have only done a soft |
| // commit) so we can ensure our indexVersion won't change unnecessarily on the un-affected |
| // shard when we add & (hard) commit our special doc... |
| final UpdateRequest req = setAuthIfNeeded(new UpdateRequest()); |
| req.commit(cluster.getSolrClient(), COLLECTION); |
| } |
| |
| // record important data for each replica core, so we can check later that it only changes for |
| // the replicas of a single shard after we add/expire a single special doc |
| log.info("Fetching ReplicaData BEFORE special doc addition/expiration"); |
| final Map<String, ReplicaData> initReplicaData = getTestDataForAllReplicas(); |
| assertTrue("WTF? no replica data?", 0 < initReplicaData.size()); |
| |
| // add & hard commit a special doc with a short TTL |
| setAuthIfNeeded(new UpdateRequest()) |
| .add(sdoc("id", "special99", "should_expire_s", "yup", "tTl_s", "+30SECONDS")) |
| .commit(cluster.getSolrClient(), COLLECTION); |
| |
| // wait for our special docId to be deleted |
| waitForNoResults( |
| 180, params("q", "id:special99", "rows", "0", "_trace", "did_special_doc_expire_yet")); |
| |
| // now check all the replicas to verify a few things: |
| // - only the replicas of one shard changed -- no unnecessary churn on other shards |
| // - every replica of each single shard should have the same number of docs |
| // - the total number of docs should match numDocsThatNeverExpire |
| log.info("Fetching ReplicaData AFTER special doc addition/expiration"); |
| final Map<String, ReplicaData> finalReplicaData = getTestDataForAllReplicas(); |
| assertEquals("WTF? not same num replicas?", initReplicaData.size(), finalReplicaData.size()); |
| |
| final Set<String> coresThatChange = new HashSet<>(); |
| final Set<String> shardsThatChange = new HashSet<>(); |
| |
| int coresCompared = 0; |
| long totalDocsOnAllShards = 0; |
| final DocCollection collectionState = |
| cluster.getSolrClient().getClusterState().getCollection(COLLECTION); |
| for (Slice shard : collectionState) { |
| boolean firstReplica = true; |
| for (Replica replica : shard) { |
| coresCompared++; |
| assertEquals(shard.getName(), replica.getShard()); // sanity check |
| final String core = replica.getCoreName(); |
| final ReplicaData initData = initReplicaData.get(core); |
| final ReplicaData finalData = finalReplicaData.get(core); |
| assertNotNull(shard.getName() + ": no init data for core: " + core, initData); |
| assertNotNull(shard.getName() + ": no final data for core: " + core, finalData); |
| |
| if (!initData.equals(finalData)) { |
| log.error("ReplicaData changed: {} != {}", initData, finalData); |
| coresThatChange.add(core + "(" + shard.getName() + ")"); |
| shardsThatChange.add(shard.getName()); |
| } |
| |
| if (firstReplica) { |
| totalDocsOnAllShards += finalData.numDocs; |
| firstReplica = false; |
| } |
| } |
| } |
| |
| assertEquals( |
| "Exactly one shard should have changed, instead: " |
| + shardsThatChange |
| + " cores=(" |
| + coresThatChange |
| + ")", |
| 1, |
| shardsThatChange.size()); |
| assertEquals("somehow we missed some cores?", initReplicaData.size(), coresCompared); |
| |
| assertEquals( |
| "Final tally has incorrect numDocsThatNeverExpire", |
| numDocsThatNeverExpire, |
| totalDocsOnAllShards); |
| |
| // TODO: above logic verifies that deleteByQuery happens on all nodes, and ... |
| // doesn't affect searcher re-open on shards w/o expired docs ... can we also verify |
| // that *only* one node is sending the deletes ? |
| // (ie: no flood of redundant deletes?) |
| |
| } |
| |
| /** |
| * returns a map whose key is the coreNodeName and whose value is data about that core needed for |
| * the test |
| */ |
| private Map<String, ReplicaData> getTestDataForAllReplicas() |
| throws IOException, SolrServerException { |
| Map<String, ReplicaData> results = new HashMap<>(); |
| |
| DocCollection collectionState = |
| cluster.getSolrClient().getClusterState().getCollection(COLLECTION); |
| |
| for (Replica replica : collectionState.getReplicas()) { |
| |
| String coreName = replica.getCoreName(); |
| try (SolrClient client = getHttpSolrClient(replica)) { |
| |
| ModifiableSolrParams params = new ModifiableSolrParams(); |
| params.set("command", "indexversion"); |
| params.set("_trace", "getIndexVersion"); |
| params.set("qt", ReplicationHandler.PATH); |
| QueryRequest req = setAuthIfNeeded(new QueryRequest(params)); |
| |
| NamedList<Object> res = client.request(req); |
| assertNotNull("null response from server: " + coreName, res); |
| |
| Object version = res.get("indexversion"); |
| assertNotNull("null version from server: " + coreName, version); |
| assertTrue("version isn't a long: " + coreName, version instanceof Long); |
| |
| long numDocs = |
| setAuthIfNeeded( |
| new QueryRequest( |
| params( |
| "q", "*:*", |
| "distrib", "false", |
| "rows", "0", |
| "_trace", "counting_docs"))) |
| .process(client) |
| .getResults() |
| .getNumFound(); |
| |
| final ReplicaData data = |
| new ReplicaData(replica.getShard(), coreName, (Long) version, numDocs); |
| log.info("{}", data); |
| results.put(coreName, data); |
| } |
| } |
| |
| return results; |
| } |
| |
| /** |
| * Executes a query over and over against the cloudClient every 5 seconds until the numFound is 0 |
| * or the maxTimeLimitSeconds is exceeded. Query is guaranteed to be executed at least once. |
| */ |
| private void waitForNoResults(int maxTimeLimitSeconds, SolrParams params) |
| throws SolrServerException, InterruptedException, IOException { |
| |
| final QueryRequest req = setAuthIfNeeded(new QueryRequest(params)); |
| final TimeOut timeout = |
| new TimeOut(maxTimeLimitSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| |
| long numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound(); |
| while (0L < numFound && !timeout.hasTimedOut()) { |
| Thread.sleep(Math.max(1, Math.min(5000, timeout.timeLeft(TimeUnit.MILLISECONDS)))); |
| |
| numFound = req.process(cluster.getSolrClient(), COLLECTION).getResults().getNumFound(); |
| } |
| |
| assertEquals("Give up waiting for no results: " + params, 0L, numFound); |
| } |
| |
| private static class ReplicaData { |
| public final String shardName; |
| public final String coreName; |
| public final long indexVersion; |
| public final long numDocs; |
| |
| public ReplicaData( |
| final String shardName, |
| final String coreName, |
| final long indexVersion, |
| final long numDocs) { |
| assertNotNull(shardName); |
| assertNotNull(coreName); |
| |
| this.shardName = shardName; |
| this.coreName = coreName; |
| this.indexVersion = indexVersion; |
| this.numDocs = numDocs; |
| } |
| |
| @Override |
| public String toString() { |
| return "ReplicaData(shard=" |
| + shardName |
| + ",core=" |
| + coreName |
| + ",indexVer=" |
| + indexVersion |
| + ",numDocs=" |
| + numDocs |
| + ")"; |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (other instanceof ReplicaData) { |
| ReplicaData that = (ReplicaData) other; |
| return this.shardName.equals(that.shardName) |
| && this.coreName.equals(that.coreName) |
| && (this.indexVersion == that.indexVersion) |
| && (this.numDocs == that.numDocs); |
| } |
| return false; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(this.shardName, this.coreName, this.indexVersion, this.numDocs); |
| } |
| } |
| } |