blob: 8bd0c8757486a4540136436bcc00638008617d41 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
*
*/
@LogLevel("org.apache.solr.cloud.api.collections.ReindexCollectionCmd=DEBUG")
public class ReindexCollectionTest extends SolrCloudTestCase {
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(2)
// only *_s
.addConfig("conf1", configset("cloud-minimal"))
// every combination of field flags
.addConfig("conf2", configset("cloud-dynamic"))
// catch-all * field, indexed+stored
.addConfig("conf3", configset("cloud-minimal-inplace-updates"))
.configure();
}
private CloudSolrClient solrClient;
private SolrCloudManager cloudManager;
private DistribStateManager stateManager;
@Before
public void doBefore() throws Exception {
ZkController zkController = cluster.getJettySolrRunner(0).getCoreContainer().getZkController();
cloudManager = zkController.getSolrCloudManager();
stateManager = cloudManager.getDistribStateManager();
solrClient = new CloudSolrClientBuilder(Collections.singletonList(zkController.getZkServerAddress()),
Optional.empty()).build();
}
private ReindexCollectionCmd.State getState(String collection) {
try {
return ReindexCollectionCmd.State.get(ReindexCollectionCmd
.getReindexingState(stateManager, collection)
.get(ReindexCollectionCmd.STATE));
} catch (Exception e) {
fail("Unexpected exception checking state of " + collection + ": " + e);
return null;
}
}
private void waitForState(String collection, ReindexCollectionCmd.State expected) throws Exception {
TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
ReindexCollectionCmd.State current = null;
while (!timeOut.hasTimedOut()) {
current = getState(collection);
if (expected == current) {
return;
}
timeOut.sleep(500);
}
throw new Exception("timeout waiting for state, current=" + current + ", expected=" + expected);
}
@After
public void doAfter() throws Exception {
cluster.deleteAllCollections(); // deletes aliases too
if (null != solrClient) {
solrClient.close();
solrClient = null;
}
TestInjection.reset();
}
private static final int NUM_DOCS = 200; // at least two batches, default batchSize=100
@Test
public void testBasicReindexing() throws Exception {
final String sourceCollection = "basicReindexing";
createCollection(sourceCollection, "conf1", 2, 2);
indexDocs(sourceCollection, NUM_DOCS,
i -> new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
final String targetCollection = "basicReindexingTarget";
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection);
CollectionAdminResponse rsp = req.process(solrClient);
assertNotNull(rsp.toString(), rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS));
@SuppressWarnings({"unchecked"})
Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("inputDocs")).longValue());
assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("processedDocs")).longValue());
CloudUtil.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
return ReindexCollectionCmd.State.FINISHED == state;
});
// verify the target docs exist
QueryResponse queryResponse = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
assertEquals("copied num docs", NUM_DOCS, queryResponse.getResults().getNumFound());
}
@Test
public void testSameTargetReindexing() throws Exception {
doTestSameTargetReindexing(false, false);
doTestSameTargetReindexing(false, true);
doTestSameTargetReindexing(true, false);
doTestSameTargetReindexing(true, true);
}
private void doTestSameTargetReindexing(boolean sourceRemove, boolean followAliases) throws Exception {
final String sourceCollection = "sameTargetReindexing_" + sourceRemove + "_" + followAliases;
final String targetCollection = sourceCollection;
createCollection(sourceCollection, "conf1", 2, 2);
indexDocs(sourceCollection, NUM_DOCS,
i -> new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection);
req.setRemoveSource(sourceRemove);
req.setFollowAliases(followAliases);
req.process(solrClient);
String realTargetCollection = null;
TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
String prefix = ReindexCollectionCmd.TARGET_COL_PREFIX + targetCollection;
while (!timeOut.hasTimedOut()) {
timeOut.sleep(500);
for (String name : cloudManager.getClusterStateProvider().getClusterState().getCollectionsMap().keySet()) {
if (name.startsWith(prefix)) {
realTargetCollection = name;
break;
}
}
if (realTargetCollection != null) {
break;
}
}
assertNotNull("target collection not present after 30s", realTargetCollection);
CloudUtil.waitForState(cloudManager, "did not finish copying in time", realTargetCollection, (liveNodes, coll) -> {
ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
return ReindexCollectionCmd.State.FINISHED == state;
});
solrClient.getZkStateReader().aliasesManager.update();
// verify the target docs exist
QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound());
ClusterState state = solrClient.getClusterStateProvider().getClusterState();
if (sourceRemove) {
assertFalse("source collection still present", state.hasCollection(sourceCollection));
}
}
@Test
public void testLossySchema() throws Exception {
final String sourceCollection = "sourceLossyReindexing";
final String targetCollection = "targetLossyReindexing";
createCollection(sourceCollection, "conf2", 2, 2);
indexDocs(sourceCollection, NUM_DOCS, i ->
new SolrInputDocument(
"id", String.valueOf(i),
"string_s", String.valueOf(i),
"sind", "this is a test " + i)); // "sind": indexed=true, stored=false, will be lost...
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection)
.setConfigName("conf3");
req.process(solrClient);
CloudUtil.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
return ReindexCollectionCmd.State.FINISHED == state;
});
// verify the target docs exist
QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound());
for (SolrDocument doc : rsp.getResults()) {
String id = (String)doc.getFieldValue("id");
assertEquals(id, doc.getFieldValue("string_s"));
assertFalse(doc.containsKey("sind")); // lost in translation ...
}
}
@Test
public void testReshapeReindexing() throws Exception {
final String sourceCollection = "reshapeReindexing";
final String targetCollection = "reshapeReindexingTarget";
createCollection(sourceCollection, "conf1", 2, 2);
indexDocs(sourceCollection, NUM_DOCS,
i -> new SolrInputDocument(
"id", String.valueOf(i),
"string_s", String.valueOf(i),
"remove_s", String.valueOf(i)));
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection)
.setCollectionParam(ZkStateReader.NUM_SHARDS_PROP, 3)
.setCollectionParam(ZkStateReader.REPLICATION_FACTOR, 1)
.setCollectionParam("router.name", ImplicitDocRouter.NAME)
.setCollectionParam("shards", "foo,bar,baz")
.setCollectionParam("fl", "id,string_s")
.setCollectionParam("q", "id:10*");
req.process(solrClient);
CloudUtil.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
return ReindexCollectionCmd.State.FINISHED == state;
});
// verify the target docs exist
QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
// 10 and 100-109
assertEquals("copied num docs", 11, rsp.getResults().getNumFound());
// verify the correct fields exist
for (SolrDocument doc : rsp.getResults()) {
assertNotNull(doc.getFieldValue("id"));
assertNotNull(doc.getFieldValue("string_s"));
assertNull(doc.getFieldValue("remove_s"));
}
// check the shape of the new collection
ClusterState clusterState = solrClient.getClusterStateProvider().getClusterState();
List<String> aliases = solrClient.getZkStateReader().getAliases().resolveAliases(targetCollection);
assertFalse(aliases.isEmpty());
String realTargetCollection = aliases.get(0);
DocCollection coll = clusterState.getCollection(realTargetCollection);
assertNotNull(coll);
assertEquals(3, coll.getSlices().size());
assertNotNull("foo", coll.getSlice("foo"));
assertNotNull("bar", coll.getSlice("bar"));
assertNotNull("baz", coll.getSlice("baz"));
assertEquals(Integer.valueOf(1), coll.getReplicationFactor());
assertEquals(ImplicitDocRouter.NAME, coll.getRouter().getName());
}
@Test
public void testFailure() throws Exception {
final String sourceCollection = "failReindexing";
final String targetCollection = "failReindexingTarget";
final String aliasTarget = "failAlias";
createCollection(sourceCollection, "conf1", 2, 2);
createCollection(targetCollection, "conf1", 1, 1);
CollectionAdminRequest.createAlias(aliasTarget, targetCollection).process(solrClient);
indexDocs(sourceCollection, NUM_DOCS,
i -> new SolrInputDocument(
"id", String.valueOf(i),
"string_s", String.valueOf(i)));
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection);
CollectionAdminResponse rsp = req.process(solrClient);
assertNotNull(rsp.getResponse().get("error"));
assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists"));
req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(aliasTarget);
rsp = req.process(solrClient);
assertNotNull(rsp.getResponse().get("error"));
assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists"));
CollectionAdminRequest.deleteAlias(aliasTarget).process(solrClient);
CollectionAdminRequest.deleteCollection(targetCollection).process(solrClient);
req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection);
TestInjection.reindexFailure = "true:100";
rsp = req.process(solrClient);
assertNotNull(rsp.getResponse().get("error"));
assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("waiting for daemon"));
// verify that the target and checkpoint collections don't exist
cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> {
assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.TARGET_COL_PREFIX));
assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.CHK_COL_PREFIX));
});
// verify that the source collection is read-write and has no reindexing flags
CloudUtil.waitForState(cloudManager, "collection state is incorrect", sourceCollection,
((liveNodes, collectionState) ->
!collectionState.isReadOnly() &&
collectionState.getStr(ReindexCollectionCmd.REINDEXING_STATE) == null &&
getState(sourceCollection) == null));
}
@Test
@SuppressWarnings({"unchecked"})
public void testAbort() throws Exception {
final String sourceCollection = "abortReindexing";
final String targetCollection = "abortReindexingTarget";
createCollection(sourceCollection, "conf1", 2, 1);
TestInjection.reindexLatch = new CountDownLatch(1);
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection);
String asyncId = req.processAsync(solrClient);
// wait for the source collection to be put in readOnly mode
CloudUtil.waitForState(cloudManager, "source collection didn't become readOnly",
sourceCollection, (liveNodes, coll) -> coll.isReadOnly());
req = CollectionAdminRequest.reindexCollection(sourceCollection);
req.setCommand("abort");
CollectionAdminResponse rsp = req.process(solrClient);
Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
assertNotNull(rsp.toString(), status);
assertEquals(status.toString(), "aborting", status.get("state"));
CloudUtil.waitForState(cloudManager, "incorrect collection state", sourceCollection,
((liveNodes, collectionState) ->
collectionState.isReadOnly() &&
getState(sourceCollection) == ReindexCollectionCmd.State.ABORTED));
// verify status
req.setCommand("status");
rsp = req.process(solrClient);
status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
assertNotNull(rsp.toString(), status);
assertEquals(status.toString(), "aborted", status.get("state"));
// let the process continue
TestInjection.reindexLatch.countDown();
CloudUtil.waitForState(cloudManager, "source collection is in wrong state",
sourceCollection, (liveNodes, docCollection) -> !docCollection.isReadOnly() && getState(sourceCollection) == null);
// verify the response
rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient);
status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
assertNotNull(rsp.toString(), status);
assertEquals(status.toString(), "aborted", status.get("state"));
}
private void createCollection(String name, String config, int numShards, int numReplicas) throws Exception {
CollectionAdminRequest.createCollection(name, config, numShards, numReplicas)
.setMaxShardsPerNode(-1)
.process(solrClient);
cluster.waitForActiveCollection(name, numShards, numShards * numReplicas);
}
private void indexDocs(String collection, int numDocs, Function<Integer, SolrInputDocument> generator) throws Exception {
List<SolrInputDocument> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
docs.add(generator.apply(i));
}
solrClient.add(collection, docs);
solrClient.commit(collection);
// verify the docs exist
QueryResponse rsp = solrClient.query(collection, params(CommonParams.Q, "*:*"));
assertEquals("num docs", NUM_DOCS, rsp.getResults().getNumFound());
}
}