| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.solr.cloud; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| |
| import org.apache.solr.client.solrj.cloud.DistribStateManager; |
| import org.apache.solr.client.solrj.cloud.SolrCloudManager; |
| import org.apache.solr.client.solrj.impl.CloudSolrClient; |
| import org.apache.solr.client.solrj.request.CollectionAdminRequest; |
| import org.apache.solr.client.solrj.response.CollectionAdminResponse; |
| import org.apache.solr.client.solrj.response.QueryResponse; |
| import org.apache.solr.cloud.api.collections.ReindexCollectionCmd; |
| import org.apache.solr.common.SolrDocument; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.cloud.ClusterState; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.ImplicitDocRouter; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.common.params.CommonParams; |
| import org.apache.solr.util.LogLevel; |
| import org.apache.solr.util.TestInjection; |
| import org.apache.solr.util.TimeOut; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| /** |
| * |
| */ |
| @LogLevel("org.apache.solr.cloud.api.collections.ReindexCollectionCmd=DEBUG") |
| public class ReindexCollectionTest extends SolrCloudTestCase { |
| |
| @BeforeClass |
| public static void setupCluster() throws Exception { |
| configureCluster(2) |
| // only *_s |
| .addConfig("conf1", configset("cloud-minimal")) |
| // every combination of field flags |
| .addConfig("conf2", configset("cloud-dynamic")) |
| // catch-all * field, indexed+stored |
| .addConfig("conf3", configset("cloud-minimal-inplace-updates")) |
| .configure(); |
| } |
| |
| private CloudSolrClient solrClient; |
| private SolrCloudManager cloudManager; |
| private DistribStateManager stateManager; |
| |
| @Before |
| public void doBefore() throws Exception { |
| ZkController zkController = cluster.getJettySolrRunner(0).getCoreContainer().getZkController(); |
| cloudManager = zkController.getSolrCloudManager(); |
| stateManager = cloudManager.getDistribStateManager(); |
| solrClient = new CloudSolrClientBuilder(Collections.singletonList(zkController.getZkServerAddress()), |
| Optional.empty()).build(); |
| } |
| |
| private ReindexCollectionCmd.State getState(String collection) { |
| try { |
| return ReindexCollectionCmd.State.get(ReindexCollectionCmd |
| .getReindexingState(stateManager, collection) |
| .get(ReindexCollectionCmd.STATE)); |
| } catch (Exception e) { |
| fail("Unexpected exception checking state of " + collection + ": " + e); |
| return null; |
| } |
| } |
| |
| private void waitForState(String collection, ReindexCollectionCmd.State expected) throws Exception { |
| TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource()); |
| ReindexCollectionCmd.State current = null; |
| while (!timeOut.hasTimedOut()) { |
| current = getState(collection); |
| if (expected == current) { |
| return; |
| } |
| timeOut.sleep(500); |
| } |
| throw new Exception("timeout waiting for state, current=" + current + ", expected=" + expected); |
| } |
| |
| @After |
| public void doAfter() throws Exception { |
| cluster.deleteAllCollections(); // deletes aliases too |
| |
| if (null != solrClient) { |
| solrClient.close(); |
| solrClient = null; |
| } |
| |
| TestInjection.reset(); |
| } |
| |
| private static final int NUM_DOCS = 200; // at least two batches, default batchSize=100 |
| |
| @Test |
| public void testBasicReindexing() throws Exception { |
| final String sourceCollection = "basicReindexing"; |
| |
| createCollection(sourceCollection, "conf1", 2, 2); |
| |
| indexDocs(sourceCollection, NUM_DOCS, |
| i -> new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i))); |
| |
| final String targetCollection = "basicReindexingTarget"; |
| |
| CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection) |
| .setTarget(targetCollection); |
| CollectionAdminResponse rsp = req.process(solrClient); |
| assertNotNull(rsp.toString(), rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS)); |
| @SuppressWarnings({"unchecked"}) |
| Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS); |
| assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("inputDocs")).longValue()); |
| assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("processedDocs")).longValue()); |
| |
| CloudUtil.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> { |
| ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE)); |
| return ReindexCollectionCmd.State.FINISHED == state; |
| }); |
| |
| // verify the target docs exist |
| QueryResponse queryResponse = solrClient.query(targetCollection, params(CommonParams.Q, "*:*")); |
| assertEquals("copied num docs", NUM_DOCS, queryResponse.getResults().getNumFound()); |
| } |
| |
| @Test |
| public void testSameTargetReindexing() throws Exception { |
| doTestSameTargetReindexing(false, false); |
| doTestSameTargetReindexing(false, true); |
| doTestSameTargetReindexing(true, false); |
| doTestSameTargetReindexing(true, true); |
| } |
| |
| private void doTestSameTargetReindexing(boolean sourceRemove, boolean followAliases) throws Exception { |
| final String sourceCollection = "sameTargetReindexing_" + sourceRemove + "_" + followAliases; |
| final String targetCollection = sourceCollection; |
| |
| createCollection(sourceCollection, "conf1", 2, 2); |
| indexDocs(sourceCollection, NUM_DOCS, |
| i -> new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i))); |
| |
| CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection) |
| .setTarget(targetCollection); |
| req.setRemoveSource(sourceRemove); |
| req.setFollowAliases(followAliases); |
| req.process(solrClient); |
| |
| String realTargetCollection = null; |
| TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource()); |
| String prefix = ReindexCollectionCmd.TARGET_COL_PREFIX + targetCollection; |
| while (!timeOut.hasTimedOut()) { |
| timeOut.sleep(500); |
| for (String name : cloudManager.getClusterStateProvider().getClusterState().getCollectionsMap().keySet()) { |
| if (name.startsWith(prefix)) { |
| realTargetCollection = name; |
| break; |
| } |
| } |
| if (realTargetCollection != null) { |
| break; |
| } |
| } |
| assertNotNull("target collection not present after 30s", realTargetCollection); |
| |
| CloudUtil.waitForState(cloudManager, "did not finish copying in time", realTargetCollection, (liveNodes, coll) -> { |
| ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE)); |
| return ReindexCollectionCmd.State.FINISHED == state; |
| }); |
| solrClient.getZkStateReader().aliasesManager.update(); |
| // verify the target docs exist |
| QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*")); |
| assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound()); |
| ClusterState state = solrClient.getClusterStateProvider().getClusterState(); |
| if (sourceRemove) { |
| assertFalse("source collection still present", state.hasCollection(sourceCollection)); |
| } |
| } |
| |
| @Test |
| public void testLossySchema() throws Exception { |
| final String sourceCollection = "sourceLossyReindexing"; |
| final String targetCollection = "targetLossyReindexing"; |
| |
| |
| createCollection(sourceCollection, "conf2", 2, 2); |
| |
| indexDocs(sourceCollection, NUM_DOCS, i -> |
| new SolrInputDocument( |
| "id", String.valueOf(i), |
| "string_s", String.valueOf(i), |
| "sind", "this is a test " + i)); // "sind": indexed=true, stored=false, will be lost... |
| |
| CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection) |
| .setTarget(targetCollection) |
| .setConfigName("conf3"); |
| req.process(solrClient); |
| |
| CloudUtil.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> { |
| ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE)); |
| return ReindexCollectionCmd.State.FINISHED == state; |
| }); |
| // verify the target docs exist |
| QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*")); |
| assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound()); |
| for (SolrDocument doc : rsp.getResults()) { |
| String id = (String)doc.getFieldValue("id"); |
| assertEquals(id, doc.getFieldValue("string_s")); |
| assertFalse(doc.containsKey("sind")); // lost in translation ... |
| } |
| } |
| |
| @Test |
| public void testReshapeReindexing() throws Exception { |
| final String sourceCollection = "reshapeReindexing"; |
| final String targetCollection = "reshapeReindexingTarget"; |
| createCollection(sourceCollection, "conf1", 2, 2); |
| indexDocs(sourceCollection, NUM_DOCS, |
| i -> new SolrInputDocument( |
| "id", String.valueOf(i), |
| "string_s", String.valueOf(i), |
| "remove_s", String.valueOf(i))); |
| |
| CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection) |
| .setTarget(targetCollection) |
| .setCollectionParam(ZkStateReader.NUM_SHARDS_PROP, 3) |
| .setCollectionParam(ZkStateReader.REPLICATION_FACTOR, 1) |
| .setCollectionParam("router.name", ImplicitDocRouter.NAME) |
| .setCollectionParam("shards", "foo,bar,baz") |
| .setCollectionParam("fl", "id,string_s") |
| .setCollectionParam("q", "id:10*"); |
| req.process(solrClient); |
| |
| CloudUtil.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> { |
| ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE)); |
| return ReindexCollectionCmd.State.FINISHED == state; |
| }); |
| // verify the target docs exist |
| QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*")); |
| |
| // 10 and 100-109 |
| assertEquals("copied num docs", 11, rsp.getResults().getNumFound()); |
| // verify the correct fields exist |
| for (SolrDocument doc : rsp.getResults()) { |
| assertNotNull(doc.getFieldValue("id")); |
| assertNotNull(doc.getFieldValue("string_s")); |
| assertNull(doc.getFieldValue("remove_s")); |
| } |
| |
| // check the shape of the new collection |
| ClusterState clusterState = solrClient.getClusterStateProvider().getClusterState(); |
| List<String> aliases = solrClient.getZkStateReader().getAliases().resolveAliases(targetCollection); |
| assertFalse(aliases.isEmpty()); |
| String realTargetCollection = aliases.get(0); |
| DocCollection coll = clusterState.getCollection(realTargetCollection); |
| assertNotNull(coll); |
| assertEquals(3, coll.getSlices().size()); |
| assertNotNull("foo", coll.getSlice("foo")); |
| assertNotNull("bar", coll.getSlice("bar")); |
| assertNotNull("baz", coll.getSlice("baz")); |
| assertEquals(Integer.valueOf(1), coll.getReplicationFactor()); |
| assertEquals(ImplicitDocRouter.NAME, coll.getRouter().getName()); |
| } |
| |
| @Test |
| public void testFailure() throws Exception { |
| final String sourceCollection = "failReindexing"; |
| final String targetCollection = "failReindexingTarget"; |
| final String aliasTarget = "failAlias"; |
| createCollection(sourceCollection, "conf1", 2, 2); |
| createCollection(targetCollection, "conf1", 1, 1); |
| CollectionAdminRequest.createAlias(aliasTarget, targetCollection).process(solrClient); |
| indexDocs(sourceCollection, NUM_DOCS, |
| i -> new SolrInputDocument( |
| "id", String.valueOf(i), |
| "string_s", String.valueOf(i))); |
| |
| CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection) |
| .setTarget(targetCollection); |
| CollectionAdminResponse rsp = req.process(solrClient); |
| assertNotNull(rsp.getResponse().get("error")); |
| assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists")); |
| |
| req = CollectionAdminRequest.reindexCollection(sourceCollection) |
| .setTarget(aliasTarget); |
| rsp = req.process(solrClient); |
| assertNotNull(rsp.getResponse().get("error")); |
| assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists")); |
| |
| CollectionAdminRequest.deleteAlias(aliasTarget).process(solrClient); |
| CollectionAdminRequest.deleteCollection(targetCollection).process(solrClient); |
| |
| req = CollectionAdminRequest.reindexCollection(sourceCollection) |
| .setTarget(targetCollection); |
| |
| TestInjection.reindexFailure = "true:100"; |
| rsp = req.process(solrClient); |
| assertNotNull(rsp.getResponse().get("error")); |
| assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("waiting for daemon")); |
| |
| // verify that the target and checkpoint collections don't exist |
| cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> { |
| assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.TARGET_COL_PREFIX)); |
| assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.CHK_COL_PREFIX)); |
| }); |
| // verify that the source collection is read-write and has no reindexing flags |
| CloudUtil.waitForState(cloudManager, "collection state is incorrect", sourceCollection, |
| ((liveNodes, collectionState) -> |
| !collectionState.isReadOnly() && |
| collectionState.getStr(ReindexCollectionCmd.REINDEXING_STATE) == null && |
| getState(sourceCollection) == null)); |
| } |
| |
| @Test |
| @SuppressWarnings({"unchecked"}) |
| public void testAbort() throws Exception { |
| final String sourceCollection = "abortReindexing"; |
| final String targetCollection = "abortReindexingTarget"; |
| createCollection(sourceCollection, "conf1", 2, 1); |
| |
| TestInjection.reindexLatch = new CountDownLatch(1); |
| CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection) |
| .setTarget(targetCollection); |
| String asyncId = req.processAsync(solrClient); |
| // wait for the source collection to be put in readOnly mode |
| CloudUtil.waitForState(cloudManager, "source collection didn't become readOnly", |
| sourceCollection, (liveNodes, coll) -> coll.isReadOnly()); |
| |
| req = CollectionAdminRequest.reindexCollection(sourceCollection); |
| req.setCommand("abort"); |
| CollectionAdminResponse rsp = req.process(solrClient); |
| Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS); |
| assertNotNull(rsp.toString(), status); |
| assertEquals(status.toString(), "aborting", status.get("state")); |
| |
| CloudUtil.waitForState(cloudManager, "incorrect collection state", sourceCollection, |
| ((liveNodes, collectionState) -> |
| collectionState.isReadOnly() && |
| getState(sourceCollection) == ReindexCollectionCmd.State.ABORTED)); |
| |
| // verify status |
| req.setCommand("status"); |
| rsp = req.process(solrClient); |
| status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS); |
| assertNotNull(rsp.toString(), status); |
| assertEquals(status.toString(), "aborted", status.get("state")); |
| // let the process continue |
| TestInjection.reindexLatch.countDown(); |
| CloudUtil.waitForState(cloudManager, "source collection is in wrong state", |
| sourceCollection, (liveNodes, docCollection) -> !docCollection.isReadOnly() && getState(sourceCollection) == null); |
| // verify the response |
| rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient); |
| status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS); |
| assertNotNull(rsp.toString(), status); |
| assertEquals(status.toString(), "aborted", status.get("state")); |
| } |
| |
| private void createCollection(String name, String config, int numShards, int numReplicas) throws Exception { |
| CollectionAdminRequest.createCollection(name, config, numShards, numReplicas) |
| .setMaxShardsPerNode(-1) |
| .process(solrClient); |
| |
| cluster.waitForActiveCollection(name, numShards, numShards * numReplicas); |
| } |
| |
| private void indexDocs(String collection, int numDocs, Function<Integer, SolrInputDocument> generator) throws Exception { |
| List<SolrInputDocument> docs = new ArrayList<>(); |
| for (int i = 0; i < numDocs; i++) { |
| docs.add(generator.apply(i)); |
| } |
| solrClient.add(collection, docs); |
| solrClient.commit(collection); |
| // verify the docs exist |
| QueryResponse rsp = solrClient.query(collection, params(CommonParams.Q, "*:*")); |
| |
| assertEquals("num docs", NUM_DOCS, rsp.getResults().getNumFound()); |
| |
| } |
| } |