blob: 3ba4a033ed1705d0ab40368b99cc4b3df0bcda6b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.junit.Ignore;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@org.apache.lucene.util.LuceneTestCase.AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-13696")
@Ignore // don't try too run abstract base class
public abstract class RoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
private static final String intField = "integer_i";
void waitColAndAlias(String alias, String separator, final String suffix, int slices) throws InterruptedException {
// collection to exist
String collection = alias + separator + suffix;
waitCol(slices, collection);
// and alias to be aware of collection
long start = System.nanoTime(); // mumble mumble precommit mumble mumble...
while (!haveCollection(alias, collection)) {
if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 10) {
fail("took over 10 seconds after collection creation to update aliases");
} else {
Thread.sleep(500);
}
}
try {
DocCollection confirmCollection = cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollectionOrNull(collection);
assertNotNull("Unable to find collection we were waiting for after done waiting",confirmCollection);
} catch (IOException e) {
fail("exception getting collection we were waiting for and have supposedly created already");
}
}
private boolean haveCollection(String alias, String collection) {
// separated into separate lines to make it easier to track down an NPE that occurred once
// 3000 runs if it shows up again...
CloudSolrClient solrClient = cluster.getSolrClient();
ZkStateReader zkStateReader = solrClient.getZkStateReader();
Aliases aliases = zkStateReader.getAliases();
Map<String, List<String>> collectionAliasListMap = aliases.getCollectionAliasListMap();
List<String> strings = collectionAliasListMap.get(alias);
return strings.contains(collection);
}
/** @see TrackingUpdateProcessorFactory */
String getTrackUpdatesGroupName() {
return getSaferTestName();
}
void createConfigSet(String configName) throws SolrServerException, IOException, InterruptedException {
// First create a configSet
// Then we create a collection with the name of the eventual config.
// We configure it, and ultimately delete the collection, leaving a modified config-set behind.
// Later we create the "real" collections referencing this modified config-set.
assertEquals(0, new ConfigSetAdminRequest.Create()
.setConfigSetName(configName)
.setBaseConfigSetName("_default")
.process(getSolrClient()).getStatus());
CollectionAdminRequest.createCollection(configName, configName, 1, 1).process(getSolrClient());
// TODO: fix SOLR-13059, a where this wait isn't working ~0.3% of the time without the sleep.
waitCol(1,configName);
Thread.sleep(500); // YUCK but works (beasts 2500x20 ok vs failing in ~500x20 every time)
// manipulate the config...
checkNoError(getSolrClient().request(new V2Request.Builder("/collections/" + configName + "/config")
.POST()
.withPayload("{" +
" 'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
" 'add-updateprocessor' : {" +
" 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
" }," +
// See TrackingUpdateProcessorFactory javadocs for details...
" 'add-updateprocessor' : {" +
" 'name':'tracking-testSliceRouting', 'class':'solr.TrackingUpdateProcessorFactory', 'group':'" + getTrackUpdatesGroupName() + "'" +
" }," +
" 'add-updateprocessor' : {" + // for testing
" 'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
" 'fieldName':'" + getIntField() + "'" +
" }," +
"}").build()));
// only sometimes test with "tolerant" URP:
final String urpNames = "inc" + (random().nextBoolean() ? ",tolerant" : "");
checkNoError(getSolrClient().request(new V2Request.Builder("/collections/" + configName + "/config/params")
.POST()
.withPayload("{" +
" 'set' : {" +
" '_UPDATE' : {'processor':'" + urpNames + "'}" +
" }" +
"}").build()));
CollectionAdminRequest.deleteCollection(configName).process(getSolrClient());
assertTrue(
new ConfigSetAdminRequest.List().process(getSolrClient()).getConfigSets()
.contains(configName)
);
}
String getIntField() {
return intField;
}
@SuppressWarnings("WeakerAccess")
void checkNoError(NamedList<Object> response) {
Object errors = response.get("errorMessages");
assertNull("" + errors, errors);
}
@SuppressWarnings("WeakerAccess")
Set<String> getLeaderCoreNames(ClusterState clusterState) {
Set<String> leaders = new TreeSet<>(); // sorted just to make it easier to read when debugging...
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
for (JettySolrRunner jettySolrRunner : jettySolrRunners) {
List<CoreDescriptor> coreDescriptors = jettySolrRunner.getCoreContainer().getCoreDescriptors();
for (CoreDescriptor core : coreDescriptors) {
String nodeName = jettySolrRunner.getNodeName();
String collectionName = core.getCollectionName();
DocCollection collectionOrNull = clusterState.getCollectionOrNull(collectionName);
List<Replica> leaderReplicas = collectionOrNull.getLeaderReplicas(nodeName);
if (leaderReplicas != null) {
for (Replica leaderReplica : leaderReplicas) {
leaders.add(leaderReplica.getCoreName());
}
}
}
}
return leaders;
}
void assertRouting(int numShards, List<UpdateCommand> updateCommands) throws IOException {
try (CloudSolrClient cloudSolrClient = getCloudSolrClient(cluster)) {
ClusterStateProvider clusterStateProvider = cloudSolrClient.getClusterStateProvider();
clusterStateProvider.connect();
Set<String> leaders = getLeaderCoreNames(clusterStateProvider.getClusterState());
assertEquals("should have " + 3 * numShards + " leaders, " + numShards + " per collection", 3 * numShards, leaders.size());
assertEquals(3, updateCommands.size());
for (UpdateCommand updateCommand : updateCommands) {
String node = (String) updateCommand.getReq().getContext().get(TrackingUpdateProcessorFactory.REQUEST_NODE);
assertTrue("Update was not routed to a leader (" + node + " not in list of leaders" + leaders, leaders.contains(node));
}
}
}
protected void waitCoreCount(String collection, int count) {
long start = System.nanoTime();
int coreFooCount;
List<JettySolrRunner> jsrs = cluster.getJettySolrRunners();
do {
coreFooCount = 0;
// have to check all jetties... there was a very confusing bug where we only checked one and
// thus might pick a jetty without a core for the collection and succeed if count = 0 when we
// should have failed, or at least waited longer
for (JettySolrRunner jsr : jsrs) {
List<CoreDescriptor> coreDescriptors = jsr.getCoreContainer().getCoreDescriptors();
for (CoreDescriptor coreDescriptor : coreDescriptors) {
String collectionName = coreDescriptor.getCollectionName();
if (collection.equals(collectionName)) {
coreFooCount ++;
}
}
}
if (NANOSECONDS.toSeconds(System.nanoTime() - start) > 60) {
fail("took over 60 seconds after collection creation to update aliases:"+collection + " core count=" + coreFooCount + " was looking for " + count);
} else {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
} while(coreFooCount != count);
}
public abstract String getAlias() ;
public abstract CloudSolrClient getSolrClient() ;
@SuppressWarnings("WeakerAccess")
void waitCol(int slices, String collection) {
waitForState("waiting for collections to be created", collection,
(liveNodes, collectionState) -> {
if (collectionState == null) {
// per predicate javadoc, this is what we get if the collection doesn't exist at all.
return false;
}
Collection<Slice> activeSlices = collectionState.getActiveSlices();
int size = activeSlices.size();
return size == slices;
});
}
/** Adds these documents and commits, returning when they are committed.
* We randomly go about this in different ways. */
void addDocsAndCommit(boolean aliasOnly, SolrInputDocument... solrInputDocuments) throws Exception {
// we assume all docs will be added (none too old/new to cause exception)
Collections.shuffle(Arrays.asList(solrInputDocuments), random());
// this is a list of the collections & the alias name. Use to pick randomly where to send.
// (it doesn't matter where we send docs since the alias is honored at the URP level)
List<String> collections = new ArrayList<>();
collections.add(getAlias());
if (!aliasOnly) {
collections.addAll(new CollectionAdminRequest.ListAliases().process(getSolrClient()).getAliasesAsLists().get(getAlias()));
}
int commitWithin = random().nextBoolean() ? -1 : 500; // if -1, we commit explicitly instead
if (random().nextBoolean()) {
// Send in separate threads. Choose random collection & solrClient
ExecutorService exec = null;
try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
try {
exec = ExecutorUtil.newMDCAwareFixedThreadPool(1 + random().nextInt(2),
new SolrNamedThreadFactory(getSaferTestName()));
List<Future<UpdateResponse>> futures = new ArrayList<>(solrInputDocuments.length);
for (SolrInputDocument solrInputDocument : solrInputDocuments) {
String col = collections.get(random().nextInt(collections.size()));
futures.add(exec.submit(() -> solrClient.add(col, solrInputDocument, commitWithin)));
}
for (Future<UpdateResponse> future : futures) {
assertUpdateResponse(future.get());
}
// at this point there shouldn't be any tasks running
assertEquals(0, exec.shutdownNow().size());
} finally {
if (exec != null) {
exec.shutdownNow();
}
}
}
} else {
// send in a batch.
String col = collections.get(random().nextInt(collections.size()));
try (CloudSolrClient solrClient = getCloudSolrClient(cluster)) {
assertUpdateResponse(solrClient.add(col, Arrays.asList(solrInputDocuments), commitWithin));
}
}
String col = collections.get(random().nextInt(collections.size()));
if (commitWithin == -1) {
getSolrClient().commit(col);
} else {
// check that it all got committed eventually
String docsQ =
"{!terms f=id}"
+ Arrays.stream(solrInputDocuments).map(d -> d.getFieldValue("id").toString())
.collect(Collectors.joining(","));
int numDocs = queryNumDocs(docsQ);
if (numDocs == solrInputDocuments.length) {
System.err.println("Docs committed sooner than expected. Bug or slow test env?");
return;
}
// wait until it's committed
Thread.sleep(commitWithin);
for (int idx = 0; idx < 100; ++idx) { // Loop for up to 10 seconds waiting for commit to catch up
numDocs = queryNumDocs(docsQ);
if (numDocs == solrInputDocuments.length) break;
Thread.sleep(100);
}
assertEquals("not committed. Bug or a slow test?",
solrInputDocuments.length, numDocs);
}
}
void assertUpdateResponse(UpdateResponse rsp) {
// use of TolerantUpdateProcessor can cause non-thrown "errors" that we need to check for
@SuppressWarnings({"rawtypes"})
List errors = (List) rsp.getResponseHeader().get("errors");
assertTrue("Expected no errors: " + errors,errors == null || errors.isEmpty());
}
private int queryNumDocs(String q) throws SolrServerException, IOException {
return (int) getSolrClient().query(getAlias(), params("q", q, "rows", "0")).getResults().getNumFound();
}
/** Adds the docs to Solr via {@link #getSolrClient()} with the params */
@SuppressWarnings("SameParameterValue")
protected UpdateResponse add(String collection, Collection<SolrInputDocument> docs, SolrParams params) throws SolrServerException, IOException {
UpdateRequest req = new UpdateRequest();
if (params != null) {
req.setParams(new ModifiableSolrParams(params));// copy because will be modified
}
req.add(docs);
return req.process(getSolrClient(), collection);
}
public static class IncrementURPFactory extends FieldMutatingUpdateProcessorFactory {
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
return FieldValueMutatingUpdateProcessor.valueMutator( getSelector(), next,
(src) -> Integer.valueOf(src.toString()) + 1);
}
}
}