| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.solr.update; |
| |
| import static org.hamcrest.core.StringContains.containsString; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import org.apache.lucene.index.IndexWriter; |
| import org.apache.lucene.index.NoMergePolicy; |
| import org.apache.lucene.tests.util.TestUtil; |
| import org.apache.solr.client.solrj.SolrClient; |
| import org.apache.solr.client.solrj.SolrServerException; |
| import org.apache.solr.client.solrj.impl.HttpSolrClient; |
| import org.apache.solr.client.solrj.request.UpdateRequest; |
| import org.apache.solr.client.solrj.request.schema.SchemaRequest.Field; |
| import org.apache.solr.client.solrj.response.UpdateResponse; |
| import org.apache.solr.client.solrj.response.schema.SchemaResponse.FieldResponse; |
| import org.apache.solr.cloud.AbstractFullDistribZkTestBase; |
| import org.apache.solr.cloud.ZkShardTerms; |
| import org.apache.solr.common.SolrDocument; |
| import org.apache.solr.common.SolrDocumentList; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.cloud.ClusterState; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.Slice; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.common.params.SolrParams; |
| import org.apache.solr.common.util.ExecutorUtil; |
| import org.apache.solr.common.util.NamedList; |
| import org.apache.solr.common.util.SolrNamedThreadFactory; |
| import org.apache.solr.common.util.TimeSource; |
| import org.apache.solr.embedded.JettySolrRunner; |
| import org.apache.solr.index.NoMergePolicyFactory; |
| import org.apache.solr.update.processor.DistributedUpdateProcessor; |
| import org.apache.solr.util.RefCounted; |
| import org.apache.solr.util.TimeOut; |
| import org.apache.zookeeper.KeeperException; |
| import org.hamcrest.MatcherAssert; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** Tests the in-place updates (docValues updates) for a one shard, three replica cluster. */ |
| public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase { |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| private final boolean onlyLeaderIndexes = random().nextBoolean(); |
| |
| @BeforeClass |
| public static void beforeSuperClass() throws Exception { |
| schemaString = "schema-inplace-updates.xml"; |
| configString = "solrconfig-tlog.xml"; |
| |
| // we need consistent segments that aren't re-ordered on merge because we're |
| // asserting inplace updates happen by checking the internal [docid] |
| systemSetPropertySolrTestsMergePolicyFactory(NoMergePolicyFactory.class.getName()); |
| |
| initCore(configString, schemaString); |
| |
| // sanity check that autocommits are disabled |
| assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxTime); |
| assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxTime); |
| assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxDocs); |
| assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxDocs); |
| |
| // assert that NoMergePolicy was chosen |
| RefCounted<IndexWriter> iw = h.getCore().getSolrCoreState().getIndexWriter(h.getCore()); |
| try { |
| IndexWriter writer = iw.get(); |
| assertTrue( |
| "Actual merge policy is: " + writer.getConfig().getMergePolicy(), |
| writer.getConfig().getMergePolicy() instanceof NoMergePolicy); |
| } finally { |
| iw.decref(); |
| } |
| } |
| |
| @Override |
| protected boolean useTlogReplicas() { |
| // TODO: tlog replicas makes commits take way to long due to what is likely a bug and it's |
| // TestInjection use |
| return false; |
| } |
| |
| public TestInPlaceUpdatesDistrib() { |
| super(); |
| sliceCount = 1; |
| fixShardCount(3); |
| } |
| |
| private SolrClient LEADER = null; |
| private List<SolrClient> NONLEADERS = null; |
| |
| @Test |
| @ShardsFixed(num = 3) |
| public void test() throws Exception { |
| waitForRecoveriesToFinish(true); |
| |
| resetDelays(); |
| |
| mapReplicasToClients(); |
| |
| clearIndex(); |
| commit(); |
| |
| // sanity check no one broke the assumptions we make about our schema |
| checkExpectedSchemaField( |
| map( |
| "name", |
| "inplace_updatable_int", |
| "type", |
| "int", |
| "stored", |
| Boolean.FALSE, |
| "indexed", |
| Boolean.FALSE, |
| "docValues", |
| Boolean.TRUE)); |
| checkExpectedSchemaField( |
| map( |
| "name", |
| "inplace_updatable_float", |
| "type", |
| "float", |
| "stored", |
| Boolean.FALSE, |
| "indexed", |
| Boolean.FALSE, |
| "docValues", |
| Boolean.TRUE)); |
| checkExpectedSchemaField( |
| map( |
| "name", |
| "_version_", |
| "type", |
| "long", |
| "stored", |
| Boolean.FALSE, |
| "indexed", |
| Boolean.FALSE, |
| "docValues", |
| Boolean.TRUE)); |
| |
| // Do the tests now: |
| |
| // AwaitsFix this test fails easily |
| // delayedReorderingFetchesMissingUpdateFromLeaderTest(); |
| |
| resetDelays(); |
| docValuesUpdateTest(); |
| resetDelays(); |
| ensureRtgWorksWithPartialUpdatesTest(); |
| resetDelays(); |
| outOfOrderUpdatesIndividualReplicaTest(); |
| resetDelays(); |
| updatingDVsInAVeryOldSegment(); |
| resetDelays(); |
| updateExistingThenNonExistentDoc(); |
| resetDelays(); |
| // TODO Should we combine all/some of these into a single test, so as to cut down on execution |
| // time? |
| reorderedDBQIndividualReplicaTest(); |
| resetDelays(); |
| reorderedDeletesTest(); |
| resetDelays(); |
| reorderedDBQsSimpleTest(); |
| resetDelays(); |
| reorderedDBQsResurrectionTest(); |
| resetDelays(); |
| setNullForDVEnabledField(); |
| resetDelays(); |
| |
| // AwaitsFix this test fails easily |
| // reorderedDBQsUsingUpdatedValueFromADroppedUpdate(); |
| } |
| |
| private void resetDelays() { |
| for (JettySolrRunner j : jettys) { |
| j.getDebugFilter().unsetDelay(); |
| } |
| } |
| |
| private void mapReplicasToClients() throws KeeperException, InterruptedException { |
| ZkStateReader zkStateReader = ZkStateReader.from(cloudClient); |
| zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION); |
| ClusterState clusterState = zkStateReader.getClusterState(); |
| Replica leader; |
| Slice shard1 = clusterState.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1); |
| leader = shard1.getLeader(); |
| |
| String leaderBaseUrl = zkStateReader.getBaseUrlForNodeName(leader.getNodeName()); |
| for (SolrClient solrClient : clients) { |
| if (((HttpSolrClient) solrClient).getBaseURL().startsWith(leaderBaseUrl)) { |
| LEADER = solrClient; |
| } |
| } |
| |
| NONLEADERS = new ArrayList<>(); |
| for (Replica rep : shard1.getReplicas()) { |
| if (rep.equals(leader)) { |
| continue; |
| } |
| String baseUrl = zkStateReader.getBaseUrlForNodeName(rep.getNodeName()); |
| for (SolrClient client : clients) { |
| if (((HttpSolrClient) client).getBaseURL().startsWith(baseUrl)) { |
| NONLEADERS.add(client); |
| } |
| } |
| } |
| |
| assertNotNull(LEADER); |
| assertEquals(2, NONLEADERS.size()); |
| } |
| |
| private void setNullForDVEnabledField() throws Exception { |
| // to test set=null |
| // should this test be here? As set null would be an atomic update |
| clearIndex(); |
| commit(); |
| |
| buildRandomIndex(0); |
| float inplace_updatable_float = 1; |
| |
| // update doc, set |
| index("id", 0, "inplace_updatable_float", map("set", inplace_updatable_float)); |
| |
| LEADER.commit(); |
| SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index |
| assertEquals(inplace_updatable_float, sdoc.get("inplace_updatable_float")); |
| assertEquals("title0", sdoc.get("title_s")); |
| long version0 = (long) sdoc.get("_version_"); |
| |
| for (SolrClient client : NONLEADERS) { |
| SolrDocument doc = client.getById(String.valueOf(0), params("distrib", "false")); |
| assertEquals(inplace_updatable_float, doc.get("inplace_updatable_float")); |
| assertEquals(version0, doc.get("_version_")); |
| } |
| |
| index("id", 0, "inplace_updatable_float", map("set", null)); |
| LEADER.commit(); |
| |
| sdoc = LEADER.getById("0"); // RTG straight from the index |
| assertNull(sdoc.get("inplace_updatable_float")); |
| assertEquals("title0", sdoc.get("title_s")); |
| long version1 = (long) sdoc.get("_version_"); |
| |
| for (SolrClient client : NONLEADERS) { |
| SolrDocument doc = client.getById(String.valueOf(0), params("distrib", "false")); |
| assertNull(doc.get("inplace_updatable_float")); |
| assertEquals(version1, doc.get("_version_")); |
| } |
| } |
| |
| final int NUM_RETRIES = 100, WAIT_TIME = 50; |
| |
| // The following should work: full update to doc 0, in-place update for doc 0, delete doc 0 |
| private void reorderedDBQsSimpleTest() throws Exception { |
| |
| clearIndex(); |
| commit(); |
| |
| buildRandomIndex(0); |
| |
| float inplace_updatable_float = 1; |
| |
| // update doc, set |
| index("id", 0, "inplace_updatable_float", map("set", inplace_updatable_float)); |
| |
| LEADER.commit(); |
| SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index |
| assertEquals(inplace_updatable_float, sdoc.get("inplace_updatable_float")); |
| assertEquals("title0", sdoc.get("title_s")); |
| long version0 = (long) sdoc.get("_version_"); |
| |
| // put replica out of sync |
| float newinplace_updatable_float = 100; |
| List<UpdateRequest> updates = new ArrayList<>(); |
| updates.add( |
| simulatedUpdateRequest( |
| null, |
| "id", |
| 0, |
| "title_s", |
| "title0_new", |
| "inplace_updatable_float", |
| newinplace_updatable_float, |
| "_version_", |
| version0 + 1)); // full update |
| updates.add( |
| simulatedUpdateRequest( |
| version0 + 1, |
| "id", |
| 0, |
| "inplace_updatable_float", |
| newinplace_updatable_float + 1, |
| "_version_", |
| version0 + 2)); // inplace_updatable_float=101 |
| updates.add(simulatedDeleteRequest(0, version0 + 3)); |
| |
| // order the updates correctly for NONLEADER 1 |
| for (UpdateRequest update : updates) { |
| if (log.isInfoEnabled()) { |
| log.info("Issuing well ordered update: {}", update.getDocuments()); |
| } |
| NONLEADERS.get(1).request(update); |
| } |
| |
| // Reordering needs to happen using parallel threads |
| ExecutorService threadpool = |
| ExecutorUtil.newMDCAwareFixedThreadPool( |
| updates.size() + 1, new SolrNamedThreadFactory(getTestName())); |
| |
| // re-order the updates for NONLEADER 0 |
| List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates); |
| Collections.shuffle(reorderedUpdates, random()); |
| List<Future<UpdateResponse>> updateResponses = new ArrayList<>(); |
| for (UpdateRequest update : reorderedUpdates) { |
| AsyncUpdateWithRandomCommit task = |
| new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), random().nextLong()); |
| updateResponses.add(threadpool.submit(task)); |
| // while we can't guarantee/trust what order the updates are executed in, since multiple |
| // threads are involved, but we're trying to bias the thread scheduling to run them in the |
| // order submitted |
| Thread.sleep(10); |
| } |
| |
| threadpool.shutdown(); |
| assertTrue( |
| "Thread pool didn't terminate within 15 secs", |
| threadpool.awaitTermination(15, TimeUnit.SECONDS)); |
| |
| // assert all requests were successful |
| for (Future<UpdateResponse> resp : updateResponses) { |
| assertEquals(0, resp.get().getStatus()); |
| } |
| |
| // assert both replicas have same effect |
| for (SolrClient client : NONLEADERS) { // 0th is re-ordered replica, 1st is well-ordered replica |
| SolrDocument doc = client.getById(String.valueOf(0), params("distrib", "false")); |
| assertNull("This doc was supposed to have been deleted, but was: " + doc, doc); |
| } |
| |
| log.info("reorderedDBQsSimpleTest: This test passed fine..."); |
| } |
| |
| private void reorderedDBQIndividualReplicaTest() throws Exception { |
| if (onlyLeaderIndexes) { |
| log.info("RTG with DBQs are not working in tlog replicas"); |
| return; |
| } |
| clearIndex(); |
| commit(); |
| |
| // put replica out of sync |
| float newinplace_updatable_float = 100; |
| long version0 = 2000; |
| List<UpdateRequest> updates = new ArrayList<>(); |
| updates.add( |
| simulatedUpdateRequest( |
| null, |
| "id", |
| 0, |
| "title_s", |
| "title0_new", |
| "inplace_updatable_float", |
| newinplace_updatable_float, |
| "_version_", |
| version0 + 1)); // full update |
| updates.add( |
| simulatedUpdateRequest( |
| version0 + 1, |
| "id", |
| 0, |
| "inplace_updatable_float", |
| newinplace_updatable_float + 1, |
| "_version_", |
| version0 + 2)); // inplace_updatable_float=101 |
| updates.add( |
| simulatedDeleteRequest( |
| "inplace_updatable_float:" + (newinplace_updatable_float + 1), version0 + 3)); |
| |
| // Reordering needs to happen using parallel threads |
| ExecutorService threadpool = |
| ExecutorUtil.newMDCAwareFixedThreadPool( |
| updates.size() + 1, new SolrNamedThreadFactory(getTestName())); |
| |
| // re-order the updates by swapping the last two |
| List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates); |
| reorderedUpdates.set(1, updates.get(2)); |
| reorderedUpdates.set(2, updates.get(1)); |
| |
| List<Future<UpdateResponse>> updateResponses = new ArrayList<>(); |
| for (UpdateRequest update : reorderedUpdates) { |
| AsyncUpdateWithRandomCommit task = |
| new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), random().nextLong()); |
| updateResponses.add(threadpool.submit(task)); |
| // while we can't guarantee/trust what order the updates are executed in, since multiple |
| // threads are involved, but we're trying to bias the thread scheduling to run them in the |
| // order submitted |
| Thread.sleep(100); |
| } |
| |
| threadpool.shutdown(); |
| assertTrue( |
| "Thread pool didn't terminate within 15 secs", |
| threadpool.awaitTermination(15, TimeUnit.SECONDS)); |
| |
| // assert all requests were successful |
| for (Future<UpdateResponse> resp : updateResponses) { |
| assertEquals(0, resp.get().getStatus()); |
| } |
| |
| SolrDocument doc = NONLEADERS.get(0).getById(String.valueOf(0), params("distrib", "false")); |
| assertNull("This doc was supposed to have been deleted, but was: " + doc, doc); |
| |
| log.info("reorderedDBQIndividualReplicaTest: This test passed fine..."); |
| clearIndex(); |
| commit(); |
| } |
| |
| private void docValuesUpdateTest() throws Exception { |
| // number of docs we're testing (0 <= id), index may contain additional random docs (id < 0) |
| int numDocs = atLeast(100); |
| if (onlyLeaderIndexes) numDocs = TestUtil.nextInt(random(), 10, 50); |
| log.info("Trying num docs = {}", numDocs); |
| final List<Integer> ids = new ArrayList<Integer>(numDocs); |
| for (int id = 0; id < numDocs; id++) { |
| ids.add(id); |
| } |
| |
| buildRandomIndex(101.0F, ids); |
| |
| List<Integer> luceneDocids = new ArrayList<>(numDocs); |
| List<Number> valuesList = new ArrayList<>(numDocs); |
| SolrParams params = |
| params( |
| "q", |
| "id:[0 TO *]", |
| "fl", |
| "*,[docid]", |
| "rows", |
| String.valueOf(numDocs), |
| "sort", |
| "id_i asc"); |
| SolrDocumentList results = LEADER.query(params).getResults(); |
| assertEquals(numDocs, results.size()); |
| for (SolrDocument doc : results) { |
| luceneDocids.add((Integer) doc.get("[docid]")); |
| valuesList.add((Float) doc.get("inplace_updatable_float")); |
| } |
| log.info("Initial results: {}", results); |
| |
| // before we do any atomic operations, sanity check our results against all clients |
| assertDocIdsAndValuesAgainstAllClients( |
| "sanitycheck", params, luceneDocids, "inplace_updatable_float", valuesList); |
| |
| // now we're going to overwrite the value for all of our testing docs |
| // giving them a value between -5 and +5 |
| for (int id : ids) { |
| // NOTE: in rare cases, this may be setting the value to 0, on a doc that |
| // already had an init value of 0 -- which is an interesting edge case, so we don't exclude it |
| final float multiplier = r.nextBoolean() ? -5.0F : 5.0F; |
| final float value = r.nextFloat() * multiplier; |
| assertTrue(-5.0F <= value && value <= 5.0F); |
| valuesList.set(id, value); |
| } |
| log.info("inplace_updatable_float: {}", valuesList); |
| |
| // update doc w/ set |
| Collections.shuffle(ids, r); // so updates aren't applied in index order |
| for (int id : ids) { |
| index("id", id, "inplace_updatable_float", map("set", valuesList.get(id))); |
| } |
| |
| commit(); |
| |
| assertDocIdsAndValuesAgainstAllClients( |
| "set", |
| SolrParams.wrapDefaults( |
| params( |
| "q", "inplace_updatable_float:[-5.0 TO 5.0]", |
| "fq", "id:[0 TO *]"), |
| // existing sort & fl that we want... |
| params), |
| luceneDocids, |
| "inplace_updatable_float", |
| valuesList); |
| |
| // update doc, w/increment |
| log.info("Updating the documents..."); |
| Collections.shuffle(ids, r); // so updates aren't applied in the same order as our 'set' |
| for (int id : ids) { |
| // all increments will use some value X such that 20 < abs(X) |
| // thus ensuring that after all increments are done, there should be |
| // 0 test docs matching the query inplace_updatable_float:[-10 TO 10] |
| final float inc = (r.nextBoolean() ? -1.0F : 1.0F) * (r.nextFloat() + (float) atLeast(20)); |
| assertTrue(20 < Math.abs(inc)); |
| final float value = (float) valuesList.get(id) + inc; |
| assertTrue(value < -10 || 10 < value); |
| |
| valuesList.set(id, value); |
| index("id", id, "inplace_updatable_float", map("inc", inc)); |
| } |
| commit(); |
| |
| assertDocIdsAndValuesAgainstAllClients( |
| "inc", |
| SolrParams.wrapDefaults( |
| params( |
| "q", "-inplace_updatable_float:[-10.0 TO 10.0]", |
| "fq", "id:[0 TO *]"), |
| // existing sort & fl that we want... |
| params), |
| luceneDocids, |
| "inplace_updatable_float", |
| valuesList); |
| |
| log.info("Updating the documents with new field..."); |
| Collections.shuffle(ids, r); |
| for (int id : ids) { |
| final int val = random().nextInt(20); |
| valuesList.set(id, val); |
| index("id", id, "inplace_updatable_int", map((random().nextBoolean() ? "inc" : "set"), val)); |
| } |
| commit(); |
| |
| assertDocIdsAndValuesAgainstAllClients( |
| "inplace_for_first_field_update", |
| SolrParams.wrapDefaults( |
| params("q", "inplace_updatable_int:[* TO *]", "fq", "id:[0 TO *]"), params), |
| luceneDocids, |
| "inplace_updatable_int", |
| valuesList); |
| log.info("docValuesUpdateTest: This test passed fine..."); |
| } |
| |
| /** Ingest many documents, keep committing. Then update a document from a very old segment. */ |
| private void updatingDVsInAVeryOldSegment() throws Exception { |
| clearIndex(); |
| commit(); |
| |
| String id = String.valueOf(Integer.MAX_VALUE); |
| index("id", id, "inplace_updatable_float", "1", "title_s", "newtitle"); |
| |
| // create 10 more segments |
| for (int i = 0; i < 10; i++) { |
| buildRandomIndex(101.0F, Collections.emptyList()); |
| } |
| |
| index("id", id, "inplace_updatable_float", map("inc", "1")); |
| |
| for (SolrClient client : new SolrClient[] {LEADER, NONLEADERS.get(0), NONLEADERS.get(1)}) { |
| assertEquals("newtitle", client.getById(id).get("title_s")); |
| assertEquals(2.0f, client.getById(id).get("inplace_updatable_float")); |
| } |
| commit(); |
| for (SolrClient client : new SolrClient[] {LEADER, NONLEADERS.get(0), NONLEADERS.get(1)}) { |
| assertEquals("newtitle", client.getById(id).get("title_s")); |
| assertEquals(2.0f, client.getById(id).get("inplace_updatable_float")); |
| } |
| |
| log.info("updatingDVsInAVeryOldSegment: This test passed fine..."); |
| } |
| |
| /** |
| * Test scenario: |
| * |
| * <ul> |
| * <li>Send a batch of documents to one node |
| * <li>Batch consist of an update for document which is existed and an update for documents |
| * which is not existed |
| * <li>Assumption which is made is that both updates will be applied: field for existed document |
| * will be updated, new document will be created for a non existed one |
| * </ul> |
| */ |
| private void updateExistingThenNonExistentDoc() throws Exception { |
| clearIndex(); |
| index("id", 1, "inplace_updatable_float", "1", "title_s", "newtitle"); |
| commit(); |
| SolrInputDocument existingDocUpdate = new SolrInputDocument(); |
| existingDocUpdate.setField("id", 1); |
| existingDocUpdate.setField("inplace_updatable_float", map("set", "50")); |
| |
| SolrInputDocument nonexistentDocUpdate = new SolrInputDocument(); |
| nonexistentDocUpdate.setField("id", 2); |
| nonexistentDocUpdate.setField("inplace_updatable_float", map("set", "50")); |
| |
| List<SolrInputDocument> docs = List.of(existingDocUpdate, nonexistentDocUpdate); |
| |
| SolrClient solrClient = clients.get(random().nextInt(clients.size())); |
| add(solrClient, null, docs); |
| commit(); |
| for (SolrClient client : new SolrClient[] {LEADER, NONLEADERS.get(0), NONLEADERS.get(1)}) { |
| for (SolrInputDocument expectDoc : docs) { |
| String docId = expectDoc.getFieldValue("id").toString(); |
| SolrDocument actualDoc = client.getById(docId); |
| assertNotNull("expected to get doc by id:" + docId, actualDoc); |
| assertEquals( |
| "expected to update " + actualDoc, 50.0f, actualDoc.get("inplace_updatable_float")); |
| } |
| } |
| } |
| |
| /** |
| * Retries the specified 'req' against each SolrClient in "clients" until the expected number of |
| * results are returned, at which point the results are verified using |
| * assertDocIdsAndValuesInResults |
| * |
| * @param debug used in log and assertion messages |
| * @param req the query to execute, should include rows & sort params such that the results |
| * can be compared to luceneDocids and valuesList |
| * @param luceneDocids a list of "[docid]" values to be tested against each doc in the req results |
| * (in order) |
| * @param fieldName used to get value from the doc to validate with valuesList |
| * @param valuesList a list of given fieldName values to be tested against each doc in results (in |
| * order) |
| */ |
| private void assertDocIdsAndValuesAgainstAllClients( |
| final String debug, |
| final SolrParams req, |
| final List<Integer> luceneDocids, |
| final String fieldName, |
| final List<Number> valuesList) |
| throws Exception { |
| assertEquals(luceneDocids.size(), valuesList.size()); |
| final long numFoundExpected = luceneDocids.size(); |
| |
| for (SolrClient client : clients) { |
| final String clientDebug = |
| client.toString() + (LEADER.equals(client) ? " (leader)" : " (not leader)"); |
| final String msg = "'" + debug + "' results against client: " + clientDebug; |
| SolrDocumentList results = null; |
| // For each client, do a (sorted) sanity check query to confirm searcher has been re-opened |
| // after our update -- if the numFound matches our expectations, then verify the inplace float |
| // value and [docid] of each result doc against our expectations to ensure that the values |
| // were |
| // updated properly w/o the doc being completely re-added internally. (ie: truly inplace) |
| RETRY: |
| for (int attempt = 0; attempt <= NUM_RETRIES; attempt++) { |
| log.info("Attempt #{} checking {}", attempt, msg); |
| results = client.query(req).getResults(); |
| if (numFoundExpected == results.getNumFound()) { |
| break RETRY; |
| } |
| if (attempt == NUM_RETRIES) { |
| fail( |
| "Repeated retry for " |
| + msg |
| + "; Never got numFound=" |
| + numFoundExpected |
| + "; results=> " |
| + results); |
| } |
| log.info("numFound mismatch, searcher may not have re-opened yet. Will sleep an retry..."); |
| Thread.sleep(WAIT_TIME); |
| } |
| |
| assertDocIdsAndValuesInResults(msg, results, luceneDocids, fieldName, valuesList); |
| } |
| } |
| |
| /** |
| * Given a result list sorted by "id", asserts that the "[docid]" and "inplace_updatable_float" |
| * values for each document match in order. |
| * |
| * @param msgPre used as a prefix for assertion messages |
| * @param results the sorted results of some query, such that all matches are included (ie: rows = |
| * numFound) |
| * @param luceneDocids a list of "[docid]" values to be tested against each doc in results (in |
| * order) |
| * @param fieldName used to get value from the doc to validate with valuesList |
| * @param valuesList a list of given fieldName values to be tested against each doc in results (in |
| * order) |
| */ |
| private void assertDocIdsAndValuesInResults( |
| final String msgPre, |
| final SolrDocumentList results, |
| final List<Integer> luceneDocids, |
| final String fieldName, |
| final List<Number> valuesList) { |
| |
| assertEquals(luceneDocids.size(), valuesList.size()); |
| assertEquals( |
| msgPre |
| + ": rows param wasn't big enough, we need to compare all results matching the query", |
| results.getNumFound(), |
| results.size()); |
| assertEquals( |
| msgPre + ": didn't get a result for every known docid", |
| luceneDocids.size(), |
| results.size()); |
| |
| for (SolrDocument doc : results) { |
| final int id = Integer.parseInt(doc.get("id").toString()); |
| final Object val = doc.get(fieldName); |
| final Object docid = doc.get("[docid]"); |
| assertEquals(msgPre + " wrong val for " + doc, valuesList.get(id), val); |
| assertEquals(msgPre + " wrong [docid] for " + doc, luceneDocids.get(id), docid); |
| } |
| } |
| |
| private void ensureRtgWorksWithPartialUpdatesTest() throws Exception { |
| clearIndex(); |
| commit(); |
| |
| float inplace_updatable_float = 1; |
| String title = "title100"; |
| long version = 0, currentVersion; |
| |
| currentVersion = buildRandomIndex(100).get(0); |
| assertTrue(currentVersion > version); |
| |
| // do an initial (non-inplace) update to ensure both the float & int fields we care about have |
| // (any) value that way all subsequent atomic updates will be inplace |
| currentVersion = |
| addDocAndGetVersion( |
| "id", 100, |
| "inplace_updatable_float", map("set", r.nextFloat()), |
| "inplace_updatable_int", map("set", r.nextInt())); |
| LEADER.commit(); |
| |
| // get the internal docids of id=100 document from the three replicas |
| List<Integer> docids = getInternalDocIds("100"); |
| |
| // update doc, set |
| currentVersion = |
| addDocAndGetVersion( |
| "id", 100, "inplace_updatable_float", map("set", inplace_updatable_float)); |
| assertTrue(currentVersion > version); |
| version = currentVersion; |
| LEADER.commit(); |
| assertEquals( |
| "Earlier: " + docids + ", now: " + getInternalDocIds("100"), |
| docids, |
| getInternalDocIds("100")); |
| |
| SolrDocument sdoc = LEADER.getById("100"); // RTG straight from the index |
| assertEquals(sdoc.toString(), inplace_updatable_float, sdoc.get("inplace_updatable_float")); |
| assertEquals(sdoc.toString(), title, sdoc.get("title_s")); |
| assertEquals(sdoc.toString(), version, sdoc.get("_version_")); |
| |
| if (r.nextBoolean()) { |
| title = "newtitle100"; |
| currentVersion = |
| addDocAndGetVersion( |
| "id", |
| 100, |
| "title_s", |
| title, |
| "inplace_updatable_float", |
| inplace_updatable_float); // full indexing |
| assertTrue(currentVersion > version); |
| version = currentVersion; |
| |
| sdoc = LEADER.getById("100"); // RTG from the tlog |
| assertEquals(sdoc.toString(), inplace_updatable_float, sdoc.get("inplace_updatable_float")); |
| assertEquals(sdoc.toString(), title, sdoc.get("title_s")); |
| assertEquals(sdoc.toString(), version, sdoc.get("_version_")); |
| |
| // we've done a full index, so we need to update the [docid] for each replica |
| LEADER.commit(); // can't get (real) [docid] from the tlogs, need to force a commit |
| docids = getInternalDocIds("100"); |
| } |
| |
| inplace_updatable_float++; |
| currentVersion = addDocAndGetVersion("id", 100, "inplace_updatable_float", map("inc", 1)); |
| assertTrue(currentVersion > version); |
| version = currentVersion; |
| LEADER.commit(); |
| assertEquals( |
| "Earlier: " + docids + ", now: " + getInternalDocIds("100"), |
| docids, |
| getInternalDocIds("100")); |
| |
| currentVersion = addDocAndGetVersion("id", 100, "inplace_updatable_int", map("set", "100")); |
| assertTrue(currentVersion > version); |
| version = currentVersion; |
| |
| inplace_updatable_float++; |
| currentVersion = addDocAndGetVersion("id", 100, "inplace_updatable_float", map("inc", 1)); |
| assertTrue(currentVersion > version); |
| version = currentVersion; |
| |
| // set operation with invalid value for field |
| SolrException e = |
| expectThrows( |
| SolrException.class, |
| () -> |
| addDocAndGetVersion( |
| "id", 100, "inplace_updatable_float", map("set", "NOT_NUMBER"))); |
| assertEquals(SolrException.ErrorCode.BAD_REQUEST.code, e.code()); |
| MatcherAssert.assertThat(e.getMessage(), containsString("For input string: \"NOT_NUMBER\"")); |
| |
| // inc operation with invalid inc value |
| e = |
| expectThrows( |
| SolrException.class, |
| () -> |
| addDocAndGetVersion("id", 100, "inplace_updatable_int", map("inc", "NOT_NUMBER"))); |
| assertEquals(SolrException.ErrorCode.BAD_REQUEST.code, e.code()); |
| MatcherAssert.assertThat(e.getMessage(), containsString("For input string: \"NOT_NUMBER\"")); |
| |
| // RTG from tlog(s) |
| for (SolrClient client : clients) { |
| final String clientDebug = |
| client.toString() + (LEADER.equals(client) ? " (leader)" : " (not leader)"); |
| sdoc = client.getById("100", params("distrib", "false")); |
| |
| assertEquals(clientDebug + " => " + sdoc, 100, sdoc.get("inplace_updatable_int")); |
| assertEquals( |
| clientDebug + " => " + sdoc, |
| inplace_updatable_float, |
| sdoc.get("inplace_updatable_float")); |
| assertEquals(clientDebug + " => " + sdoc, title, sdoc.get("title_s")); |
| assertEquals(clientDebug + " => " + sdoc, version, sdoc.get("_version_")); |
| } |
| |
| // assert that the internal docid for id=100 document remains same, in each replica, as before |
| LEADER.commit(); // can't get (real) [docid] from the tlogs, need to force a commit |
| assertEquals( |
| "Earlier: " + docids + ", now: " + getInternalDocIds("100"), |
| docids, |
| getInternalDocIds("100")); |
| |
| log.info("ensureRtgWorksWithPartialUpdatesTest: This test passed fine..."); |
| } |
| |
| /** |
| * Returns the "[docid]" value(s) returned from a non-distrib RTG to each of the clients used in |
| * this test (in the same order as the clients list) |
| */ |
| private List<Integer> getInternalDocIds(String id) throws SolrServerException, IOException { |
| List<Integer> ret = new ArrayList<>(clients.size()); |
| for (SolrClient client : clients) { |
| SolrDocument doc = client.getById(id, params("distrib", "false", "fl", "[docid]")); |
| Object docid = doc.get("[docid]"); |
| assertNotNull(docid); |
| assertEquals(Integer.class, docid.getClass()); |
| ret.add((Integer) docid); |
| } |
| assertEquals(clients.size(), ret.size()); |
| return ret; |
| } |
| |
| private void outOfOrderUpdatesIndividualReplicaTest() throws Exception { |
| clearIndex(); |
| commit(); |
| |
| buildRandomIndex(0); |
| |
| float inplace_updatable_float = 1; |
| // update doc, set |
| index("id", 0, "inplace_updatable_float", map("set", inplace_updatable_float)); |
| |
| LEADER.commit(); |
| SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index |
| assertEquals(inplace_updatable_float, sdoc.get("inplace_updatable_float")); |
| assertEquals("title0", sdoc.get("title_s")); |
| long version0 = (long) sdoc.get("_version_"); |
| |
| // put replica out of sync |
| float newinplace_updatable_float = 100; |
| List<UpdateRequest> updates = new ArrayList<>(); |
| updates.add( |
| simulatedUpdateRequest( |
| null, |
| "id", |
| 0, |
| "title_s", |
| "title0_new", |
| "inplace_updatable_float", |
| newinplace_updatable_float, |
| "_version_", |
| version0 + 1)); // full update |
| for (int i = 1; i < atLeast(3); i++) { |
| updates.add( |
| simulatedUpdateRequest( |
| version0 + i, |
| "id", |
| 0, |
| "inplace_updatable_float", |
| newinplace_updatable_float + i, |
| "_version_", |
| version0 + i + 1)); |
| } |
| |
| // order the updates correctly for NONLEADER 1 |
| for (UpdateRequest update : updates) { |
| if (log.isInfoEnabled()) { |
| log.info("Issuing well ordered update: {}", update.getDocuments()); |
| } |
| NONLEADERS.get(1).request(update); |
| } |
| |
| // Reordering needs to happen using parallel threads, since some of these updates will |
| // be blocking calls, waiting for some previous update operations to arrive on which it depends. |
| ExecutorService threadpool = |
| ExecutorUtil.newMDCAwareFixedThreadPool( |
| updates.size() + 1, new SolrNamedThreadFactory(getTestName())); |
| |
| // re-order the updates for NONLEADER 0 |
| List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates); |
| Collections.shuffle(reorderedUpdates, r); |
| List<Future<UpdateResponse>> updateResponses = new ArrayList<>(); |
| for (UpdateRequest update : reorderedUpdates) { |
| AsyncUpdateWithRandomCommit task = |
| new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), random().nextLong()); |
| updateResponses.add(threadpool.submit(task)); |
| // while we can't guarantee/trust what order the updates are executed in, since multiple |
| // threads are involved, but we're trying to bias the thread scheduling to run them in the |
| // order submitted |
| Thread.sleep(10); |
| } |
| |
| threadpool.shutdown(); |
| assertTrue( |
| "Thread pool didn't terminate within 15 secs", |
| threadpool.awaitTermination(15, TimeUnit.SECONDS)); |
| |
| // assert all requests were successful |
| for (Future<UpdateResponse> resp : updateResponses) { |
| assertEquals(0, resp.get().getStatus()); |
| } |
| |
| // assert both replicas have same effect |
| for (SolrClient client : NONLEADERS) { // 0th is re-ordered replica, 1st is well-ordered replica |
| if (log.isInfoEnabled()) { |
| log.info("Testing client: {}", ((HttpSolrClient) client).getBaseURL()); |
| } |
| assertReplicaValue( |
| client, |
| 0, |
| "inplace_updatable_float", |
| (newinplace_updatable_float + (float) (updates.size() - 1)), |
| "inplace_updatable_float didn't match for replica at client: " |
| + ((HttpSolrClient) client).getBaseURL()); |
| assertReplicaValue( |
| client, |
| 0, |
| "title_s", |
| "title0_new", |
| "Title didn't match for replica at client: " + ((HttpSolrClient) client).getBaseURL()); |
| assertEquals(version0 + updates.size(), getReplicaValue(client, 0, "_version_")); |
| } |
| |
| log.info("outOfOrderUpdatesIndividualReplicaTest: This test passed fine..."); |
| } |
| |
| // The following should work: full update to doc 0, in-place update for doc 0, delete doc 0 |
| private void reorderedDeletesTest() throws Exception { |
| |
| clearIndex(); |
| commit(); |
| |
| buildRandomIndex(0); |
| |
| float inplace_updatable_float = 1; |
| // update doc, set |
| index("id", 0, "inplace_updatable_float", map("set", inplace_updatable_float)); |
| |
| LEADER.commit(); |
| SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index |
| assertEquals(inplace_updatable_float, sdoc.get("inplace_updatable_float")); |
| assertEquals("title0", sdoc.get("title_s")); |
| long version0 = (long) sdoc.get("_version_"); |
| |
| // put replica out of sync |
| float newinplace_updatable_float = 100; |
| List<UpdateRequest> updates = new ArrayList<>(); |
| updates.add( |
| simulatedUpdateRequest( |
| null, |
| "id", |
| 0, |
| "title_s", |
| "title0_new", |
| "inplace_updatable_float", |
| newinplace_updatable_float, |
| "_version_", |
| version0 + 1)); // full update |
| updates.add( |
| simulatedUpdateRequest( |
| version0 + 1, |
| "id", |
| 0, |
| "inplace_updatable_float", |
| newinplace_updatable_float + 1, |
| "_version_", |
| version0 + 2)); // inplace_updatable_float=101 |
| updates.add(simulatedDeleteRequest(0, version0 + 3)); |
| |
| // order the updates correctly for NONLEADER 1 |
| for (UpdateRequest update : updates) { |
| if (log.isInfoEnabled()) { |
| log.info("Issuing well ordered update: {}", update.getDocuments()); |
| } |
| NONLEADERS.get(1).request(update); |
| } |
| |
| // Reordering needs to happen using parallel threads |
| ExecutorService threadpool = |
| ExecutorUtil.newMDCAwareFixedThreadPool( |
| updates.size() + 1, new SolrNamedThreadFactory(getTestName())); |
| |
| // re-order the updates for NONLEADER 0 |
| List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates); |
| Collections.shuffle(reorderedUpdates, r); |
| List<Future<UpdateResponse>> updateResponses = new ArrayList<>(); |
| for (UpdateRequest update : reorderedUpdates) { |
| AsyncUpdateWithRandomCommit task = |
| new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), random().nextLong()); |
| updateResponses.add(threadpool.submit(task)); |
| // while we can't guarantee/trust what order the updates are executed in, since multiple |
| // threads are involved, but we're trying to bias the thread scheduling to run them in the |
| // order submitted |
| Thread.sleep(10); |
| } |
| |
| threadpool.shutdown(); |
| assertTrue( |
| "Thread pool didn't terminate within 15 secs", |
| threadpool.awaitTermination(15, TimeUnit.SECONDS)); |
| |
| // assert all requests were successful |
| for (Future<UpdateResponse> resp : updateResponses) { |
| assertEquals(0, resp.get().getStatus()); |
| } |
| |
| // assert both replicas have same effect |
| for (SolrClient client : NONLEADERS) { // 0th is re-ordered replica, 1st is well-ordered replica |
| SolrDocument doc = client.getById(String.valueOf(0), params("distrib", "false")); |
| assertNull("This doc was supposed to have been deleted, but was: " + doc, doc); |
| } |
| |
| log.info("reorderedDeletesTest: This test passed fine..."); |
| } |
| |
| /* Test for a situation when a document requiring in-place update cannot be "resurrected" |
| * when the original full indexed document has been deleted by an out of order DBQ. |
| * Expected behaviour in this case should be to throw the replica into LIR (since this will |
| * be rare). Here's an example of the situation: |
| ADD(id=x, val=5, ver=1) |
| UPD(id=x, val=10, ver = 2) |
| DBQ(q=val:10, v=4) |
| DV(id=x, val=5, ver=3) |
| */ |
| private void reorderedDBQsResurrectionTest() throws Exception { |
| if (onlyLeaderIndexes) { |
| log.info("RTG with DBQs are not working in tlog replicas"); |
| return; |
| } |
| clearIndex(); |
| commit(); |
| |
| buildRandomIndex(0); |
| |
| SolrDocument sdoc = LEADER.getById("0"); // RTG straight from the index |
| // assertEquals(value, sdoc.get("inplace_updatable_float")); |
| assertEquals("title0", sdoc.get("title_s")); |
| long version0 = (long) sdoc.get("_version_"); |
| |
| String field = "inplace_updatable_int"; |
| |
| // put replica out of sync |
| List<UpdateRequest> updates = new ArrayList<>(); |
| updates.add( |
| simulatedUpdateRequest( |
| null, |
| "id", |
| 0, |
| "title_s", |
| "title0_new", |
| field, |
| 5, |
| "_version_", |
| version0 + 1)); // full update |
| updates.add( |
| simulatedUpdateRequest( |
| version0 + 1, |
| "id", |
| 0, |
| field, |
| 10, |
| "_version_", |
| version0 + 2)); // inplace_updatable_float=101 |
| updates.add( |
| simulatedUpdateRequest( |
| version0 + 2, |
| "id", |
| 0, |
| field, |
| 5, |
| "_version_", |
| version0 + 3)); // inplace_updatable_float=101 |
| updates.add( |
| simulatedDeleteRequest(field + ":10", version0 + 4)); // supposed to not delete anything |
| |
| // order the updates correctly for NONLEADER 1 |
| for (UpdateRequest update : updates) { |
| if (log.isInfoEnabled()) { |
| log.info("Issuing well ordered update: {}", update.getDocuments()); |
| } |
| NONLEADERS.get(1).request(update); |
| } |
| |
| // Reordering needs to happen using parallel threads |
| ExecutorService threadpool = |
| ExecutorUtil.newMDCAwareFixedThreadPool( |
| updates.size() + 1, new SolrNamedThreadFactory(getTestName())); |
| // re-order the last two updates for NONLEADER 0 |
| List<UpdateRequest> reorderedUpdates = new ArrayList<>(updates); |
| Collections.swap(reorderedUpdates, 2, 3); |
| |
| List<Future<UpdateResponse>> updateResponses = new ArrayList<>(); |
| for (UpdateRequest update : reorderedUpdates) { |
| // pretend as this update is coming from the other non-leader, so that |
| // the resurrection can happen from there (instead of the leader) |
| update.setParam( |
| DistributedUpdateProcessor.DISTRIB_FROM, |
| ((HttpSolrClient) NONLEADERS.get(1)).getBaseURL() |
| + "/" |
| + NONLEADERS.get(1).getDefaultCollection()); |
| AsyncUpdateWithRandomCommit task = |
| new AsyncUpdateWithRandomCommit(update, NONLEADERS.get(0), random().nextLong()); |
| updateResponses.add(threadpool.submit(task)); |
| // while we can't guarantee/trust what order the updates are executed in, since multiple |
| // threads are involved, but we're trying to bias the thread scheduling to run them in the |
| // order submitted |
| Thread.sleep(10); |
| } |
| |
| threadpool.shutdown(); |
| assertTrue( |
| "Thread pool didn't terminate within 15 secs", |
| threadpool.awaitTermination(15, TimeUnit.SECONDS)); |
| |
| int successful = 0; |
| for (Future<UpdateResponse> resp : updateResponses) { |
| try { |
| UpdateResponse r = resp.get(); |
| if (r.getStatus() == 0) { |
| successful++; |
| } |
| } catch (Exception ex) { |
| // reordered DBQ should trigger an error, thus throwing the replica into LIR. |
| // the cause of the error is that the full document was deleted by mistake due to the |
| // out of order DBQ, and the in-place update that arrives after the DBQ (but was supposed to |
| // arrive before) cannot be applied, since the full document can't now be "resurrected". |
| |
| if (!ex.getMessage() |
| .contains( |
| "Tried to fetch missing update" |
| + " from the leader, but missing wasn't present at leader.")) { |
| throw ex; |
| } |
| } |
| } |
| // All should succeed, i.e. no LIR |
| assertEquals(updateResponses.size(), successful); |
| |
| if (log.isInfoEnabled()) { |
| log.info("Non leader 0: {}", ((HttpSolrClient) NONLEADERS.get(0)).getBaseURL()); |
| log.info("Non leader 1: {}", ((HttpSolrClient) NONLEADERS.get(1)).getBaseURL()); // nowarn |
| } |
| |
| SolrDocument doc0 = NONLEADERS.get(0).getById(String.valueOf(0), params("distrib", "false")); |
| SolrDocument doc1 = NONLEADERS.get(1).getById(String.valueOf(0), params("distrib", "false")); |
| |
| log.info("Doc in both replica 0: {}", doc0); |
| log.info("Doc in both replica 1: {}", doc1); |
| // assert both replicas have same effect |
| for (SolrClient client : NONLEADERS) { // 0th is re-ordered replica, 1st is well-ordered replica |
| SolrDocument doc = client.getById(String.valueOf(0), params("distrib", "false")); |
| assertNotNull("Client: " + ((HttpSolrClient) client).getBaseURL(), doc); |
| assertEquals( |
| "Client: " + ((HttpSolrClient) client).getBaseURL(), 5, doc.getFieldValue(field)); |
| } |
| |
| log.info("reorderedDBQsResurrectionTest: This test passed fine..."); |
| clearIndex(); |
| commit(); |
| } |
| |
| private void delayedReorderingFetchesMissingUpdateFromLeaderTest() throws Exception { |
| clearIndex(); |
| commit(); |
| |
| float inplace_updatable_float = 1F; |
| buildRandomIndex(inplace_updatable_float, Collections.singletonList(1)); |
| |
| float newinplace_updatable_float = 100F; |
| List<UpdateRequest> updates = new ArrayList<>(); |
| updates.add( |
| regularUpdateRequest( |
| "id", |
| 1, |
| "title_s", |
| "title1_new", |
| "id_i", |
| 1, |
| "inplace_updatable_float", |
| newinplace_updatable_float)); |
| updates.add(regularUpdateRequest("id", 1, "inplace_updatable_float", map("inc", 1))); |
| updates.add(regularUpdateRequest("id", 1, "inplace_updatable_float", map("inc", 1))); |
| |
| // The next request to replica2 will be delayed (timeout is 5s) |
| shardToJetty |
| .get(SHARD1) |
| .get(1) |
| .jetty |
| .getDebugFilter() |
| .addDelay("Waiting for dependant update to timeout", 1, 6000); |
| |
| ExecutorService threadpool = |
| ExecutorUtil.newMDCAwareFixedThreadPool( |
| updates.size() + 1, new SolrNamedThreadFactory(getTestName())); |
| for (UpdateRequest update : updates) { |
| AsyncUpdateWithRandomCommit task = |
| new AsyncUpdateWithRandomCommit(update, cloudClient, random().nextLong()); |
| threadpool.submit(task); |
| |
| // while we can't guarantee/trust what order the updates are executed in, since multiple |
| // threads are involved, but we're trying to bias the thread scheduling to run them in the |
| // order submitted |
| Thread.sleep(100); |
| } |
| |
| threadpool.shutdown(); |
| assertTrue( |
| "Thread pool didn't terminate within 15 secs", |
| threadpool.awaitTermination(15, TimeUnit.SECONDS)); |
| |
| commit(); |
| |
| // TODO: Could try checking ZK for LIR flags to ensure LIR has not kicked in |
| // Check every 10ms, 100 times, for a replica to go down (& assert that it doesn't) |
| for (int i = 0; i < 100; i++) { |
| Thread.sleep(10); |
| ZkStateReader.from(cloudClient).forceUpdateCollection(DEFAULT_COLLECTION); |
| ClusterState state = cloudClient.getClusterState(); |
| |
| int numActiveReplicas = 0; |
| for (Replica rep : state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas()) |
| if (rep.getState().equals(Replica.State.ACTIVE)) numActiveReplicas++; |
| |
| assertEquals( |
| "The replica receiving reordered updates must not have gone down", 3, numActiveReplicas); |
| } |
| |
| for (SolrClient client : clients) { |
| TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| try { |
| timeout.waitFor( |
| "Timeout", |
| () -> { |
| try { |
| return (float) getReplicaValue(client, 1, "inplace_updatable_float") |
| == newinplace_updatable_float + 2.0f; |
| } catch (SolrServerException e) { |
| throw new RuntimeException(e); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| }); |
| } catch (TimeoutException e) { |
| |
| } |
| } |
| |
| for (SolrClient client : clients) { |
| if (log.isInfoEnabled()) { |
| log.info("Testing client (Fetch missing test): {}", ((HttpSolrClient) client).getBaseURL()); |
| log.info( |
| "Version at {} is: {}", |
| ((HttpSolrClient) client).getBaseURL(), |
| getReplicaValue(client, 1, "_version_")); // nowarn |
| } |
| assertReplicaValue( |
| client, |
| 1, |
| "inplace_updatable_float", |
| (newinplace_updatable_float + 2.0f), |
| "inplace_updatable_float didn't match for replica at client: " |
| + ((HttpSolrClient) client).getBaseURL()); |
| assertReplicaValue( |
| client, |
| 1, |
| "title_s", |
| "title1_new", |
| "Title didn't match for replica at client: " + ((HttpSolrClient) client).getBaseURL()); |
| } |
| |
| // Try another round of these updates, this time with a delete request at the end. |
| // This is to ensure that the fetch missing update from leader doesn't bomb out if the |
| // document has been deleted on the leader later on |
| { |
| clearIndex(); |
| commit(); |
| shardToJetty.get(SHARD1).get(1).jetty.getDebugFilter().unsetDelay(); |
| |
| updates.add(regularDeleteRequest(1)); |
| |
| shardToJetty |
| .get(SHARD1) |
| .get(1) |
| .jetty |
| .getDebugFilter() |
| .addDelay("Waiting for dependant update to timeout", 1, 5999); // the first update |
| shardToJetty |
| .get(SHARD1) |
| .get(1) |
| .jetty |
| .getDebugFilter() |
| .addDelay("Waiting for dependant update to timeout", 4, 5998); // the delete update |
| |
| threadpool = |
| ExecutorUtil.newMDCAwareFixedThreadPool( |
| updates.size() + 1, new SolrNamedThreadFactory(getTestName())); |
| for (UpdateRequest update : updates) { |
| AsyncUpdateWithRandomCommit task = |
| new AsyncUpdateWithRandomCommit(update, cloudClient, random().nextLong()); |
| threadpool.submit(task); |
| |
| // while we can't guarantee/trust what order the updates are executed in, since multiple |
| // threads are involved, but we're trying to bias the thread scheduling to run them in the |
| // order submitted |
| Thread.sleep(100); |
| } |
| |
| threadpool.shutdown(); |
| assertTrue( |
| "Thread pool didn't terminate within 15 secs", |
| threadpool.awaitTermination(15, TimeUnit.SECONDS)); |
| |
| commit(); |
| |
| try (ZkShardTerms zkShardTerms = |
| new ZkShardTerms( |
| DEFAULT_COLLECTION, SHARD1, ZkStateReader.from(cloudClient).getZkClient())) { |
| for (int i = 0; i < 100; i++) { |
| Thread.sleep(10); |
| ZkStateReader.from(cloudClient).forceUpdateCollection(DEFAULT_COLLECTION); |
| ClusterState state = cloudClient.getClusterState(); |
| |
| int numActiveReplicas = 0; |
| for (Replica rep : |
| state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas()) { |
| assertTrue(zkShardTerms.canBecomeLeader(rep.getName())); |
| if (rep.getState().equals(Replica.State.ACTIVE)) numActiveReplicas++; |
| } |
| assertEquals( |
| "The replica receiving reordered updates must not have gone down", |
| 3, |
| numActiveReplicas); |
| } |
| } |
| |
| for (SolrClient client : |
| new SolrClient[] { |
| LEADER, NONLEADERS.get(0), NONLEADERS.get(1) |
| }) { // nonleader 0 re-ordered replica, nonleader 1 well-ordered replica |
| SolrDocument doc = client.getById(String.valueOf(1), params("distrib", "false")); |
| assertNull("This doc was supposed to have been deleted, but was: " + doc, doc); |
| } |
| } |
| log.info("delayedReorderingFetchesMissingUpdateFromLeaderTest: This test passed fine..."); |
| } |
| |
| /** |
| * Use the schema API to verify that the specified expected Field exists with those exact |
| * attributes. |
| */ |
| public void checkExpectedSchemaField(Map<String, Object> expected) throws Exception { |
| String fieldName = (String) expected.get("name"); |
| assertNotNull("expected contains no name: " + expected, fieldName); |
| FieldResponse rsp = new Field(fieldName).process(this.cloudClient); |
| assertNotNull("Field Null Response: " + fieldName, rsp); |
| assertEquals("Field Status: " + fieldName + " => " + rsp, 0, rsp.getStatus()); |
| assertEquals("Field: " + fieldName, expected, rsp.getField()); |
| } |
| |
| private class AsyncUpdateWithRandomCommit implements Callable<UpdateResponse> { |
| UpdateRequest update; |
| SolrClient solrClient; |
| final Random rnd; |
| int commitBound = onlyLeaderIndexes ? 50 : 3; |
| |
| public AsyncUpdateWithRandomCommit(UpdateRequest update, SolrClient solrClient, long seed) { |
| this.update = update; |
| this.solrClient = solrClient; |
| this.rnd = new Random(seed); |
| } |
| |
| @Override |
| public UpdateResponse call() throws Exception { |
| UpdateResponse resp = update.process(solrClient); // solrClient.request(update); |
| if (rnd.nextInt(commitBound) == 0) solrClient.commit(); |
| return resp; |
| } |
| } |
| |
| Object getReplicaValue(SolrClient client, int doc, String field) |
| throws SolrServerException, IOException { |
| SolrDocument sdoc = client.getById(String.valueOf(doc), params("distrib", "false")); |
| return sdoc == null ? null : sdoc.get(field); |
| } |
| |
| void assertReplicaValue(SolrClient client, int doc, String field, Object expected, String message) |
| throws SolrServerException, IOException { |
| assertEquals(message, expected, getReplicaValue(client, doc, field)); |
| } |
| |
| // This returns an UpdateRequest with the given fields that represent a document. |
| // This request is constructed such that it is a simulation of a request coming from |
| // a leader to a replica. |
| UpdateRequest simulatedUpdateRequest(Long prevVersion, Object... fields) throws IOException { |
| SolrInputDocument doc = sdoc(fields); |
| |
| // get baseUrl of the leader |
| String baseUrl = getBaseUrl(doc.get("id").toString()); |
| |
| UpdateRequest ur = new UpdateRequest(); |
| ur.add(doc); |
| ur.setParam("update.distrib", "FROMLEADER"); |
| if (prevVersion != null) { |
| ur.setParam("distrib.inplace.prevversion", String.valueOf(prevVersion)); |
| ur.setParam("distrib.inplace.update", "true"); |
| } |
| ur.setParam("distrib.from", baseUrl); |
| return ur; |
| } |
| |
| UpdateRequest simulatedDeleteRequest(int id, long version) throws IOException { |
| String baseUrl = getBaseUrl("" + id); |
| |
| UpdateRequest ur = new UpdateRequest(); |
| if (random().nextBoolean() || onlyLeaderIndexes) { |
| ur.deleteById("" + id); |
| } else { |
| ur.deleteByQuery("id:" + id); |
| } |
| ur.setParam("_version_", "" + version); |
| ur.setParam("update.distrib", "FROMLEADER"); |
| ur.setParam("distrib.from", baseUrl); |
| return ur; |
| } |
| |
| UpdateRequest simulatedDeleteRequest(String query, long version) { |
| String baseUrl = getBaseUrl((HttpSolrClient) LEADER); |
| |
| UpdateRequest ur = new UpdateRequest(); |
| ur.deleteByQuery(query); |
| ur.setParam("_version_", "" + version); |
| ur.setParam("update.distrib", "FROMLEADER"); |
| ur.setParam("distrib.from", baseUrl + DEFAULT_COLLECTION + "/"); |
| return ur; |
| } |
| |
| private String getBaseUrl(String id) { |
| DocCollection collection = cloudClient.getClusterState().getCollection(DEFAULT_COLLECTION); |
| Slice slice = collection.getRouter().getTargetSlice(id, null, null, null, collection); |
| return slice.getLeader().getCoreUrl(); |
| } |
| |
| protected String getBaseUrl(HttpSolrClient client) { |
| // take a complete Solr url that ends with /collection1 and truncates it to the root url |
| // that is used for admin api calls. |
| return client |
| .getBaseURL() |
| .substring(0, client.getBaseURL().length() - DEFAULT_COLLECTION.length() - 1); |
| } |
| |
| UpdateRequest regularUpdateRequest(Object... fields) { |
| UpdateRequest ur = new UpdateRequest(); |
| SolrInputDocument doc = sdoc(fields); |
| ur.add(doc); |
| return ur; |
| } |
| |
| UpdateRequest regularDeleteRequest(int id) { |
| UpdateRequest ur = new UpdateRequest(); |
| ur.deleteById("" + id); |
| return ur; |
| } |
| |
| UpdateRequest regularDeleteByQueryRequest(String q) { |
| UpdateRequest ur = new UpdateRequest(); |
| ur.deleteByQuery(q); |
| return ur; |
| } |
| |
| protected long addDocAndGetVersion(Object... fields) throws Exception { |
| SolrInputDocument doc = new SolrInputDocument(); |
| addFields(doc, fields); |
| |
| UpdateRequest ureq = new UpdateRequest(); |
| ureq.setParam("versions", "true"); |
| ureq.add(doc); |
| UpdateResponse resp; |
| |
| // send updates to leader, to avoid SOLR-8733 |
| resp = ureq.process(LEADER); |
| |
| long returnedVersion = |
| Long.parseLong(((NamedList<?>) resp.getResponse().get("adds")).getVal(0).toString()); |
| assertTrue( |
| "Due to SOLR-8733, sometimes returned version is 0. Let us assert that we have successfully" |
| + " worked around that problem here.", |
| returnedVersion > 0); |
| return returnedVersion; |
| } |
| |
| /** |
| * Convenience method variant that never uses <code>initFloat</code> |
| * |
| * @see #buildRandomIndex(Float,List) |
| */ |
| protected List<Long> buildRandomIndex(Integer... specialIds) throws Exception { |
| return buildRandomIndex(null, Arrays.asList(specialIds)); |
| } |
| |
| /** |
| * Helper method to build a randomized index with the fields needed for all test methods in this |
| * class. At a minimum, this index will contain 1 doc per "special" (non-negative) document id. |
| * These special documents will be added with the <code>initFloat</code> specified in the |
| * "inplace_updatable_float" field. |
| * |
| * <p>A random number of documents (with negative ids) will be indexed in between each of the |
| * "special" documents, as well as before/after the first/last special document. |
| * |
| * @param initFloat Value to use in the "inplace_updatable_float" for the special documents; will |
| * never be used if null |
| * @param specialIds The ids to use for the special documents, all values must be non-negative |
| * @return the versions of each of the specials document returned when indexing it |
| */ |
| protected List<Long> buildRandomIndex(Float initFloat, List<Integer> specialIds) |
| throws Exception { |
| |
| int id = -1; // used for non-special docs |
| final int numPreDocs = |
| rarely() || onlyLeaderIndexes ? TestUtil.nextInt(random(), 0, 9) : atLeast(10); |
| for (int i = 1; i <= numPreDocs; i++) { |
| addDocAndGetVersion("id", id, "title_s", "title" + id, "id_i", id); |
| id--; |
| } |
| final List<Long> versions = new ArrayList<>(specialIds.size()); |
| for (int special : specialIds) { |
| if (null == initFloat) { |
| versions.add( |
| addDocAndGetVersion("id", special, "title_s", "title" + special, "id_i", special)); |
| } else { |
| versions.add( |
| addDocAndGetVersion( |
| "id", |
| special, |
| "title_s", |
| "title" + special, |
| "id_i", |
| special, |
| "inplace_updatable_float", |
| initFloat)); |
| } |
| final int numPostDocs = |
| rarely() || onlyLeaderIndexes ? TestUtil.nextInt(random(), 0, 2) : atLeast(10); |
| for (int i = 1; i <= numPostDocs; i++) { |
| addDocAndGetVersion("id", id, "title_s", "title" + id, "id_i", id); |
| id--; |
| } |
| } |
| LEADER.commit(); |
| |
| assertEquals(specialIds.size(), versions.size()); |
| return versions; |
| } |
| |
| /* |
| * Situation: |
| * add(id=1,inpfield=12,title=mytitle,version=1) |
| * inp(id=1,inpfield=13,prevVersion=1,version=2) // timeout indefinitely |
| * inp(id=1,inpfield=14,prevVersion=2,version=3) // will wait till timeout, and then fetch a "not found" from leader |
| * dbq("inp:14",version=4) |
| */ |
| private void reorderedDBQsUsingUpdatedValueFromADroppedUpdate() throws Exception { |
| if (onlyLeaderIndexes) { |
| log.info("RTG with DBQs are not working in tlog replicas"); |
| return; |
| } |
| clearIndex(); |
| commit(); |
| |
| float inplace_updatable_float = 1F; |
| buildRandomIndex(inplace_updatable_float, Collections.singletonList(1)); |
| |
| List<UpdateRequest> updates = new ArrayList<>(); |
| updates.add( |
| regularUpdateRequest( |
| "id", 1, "id_i", 1, "inplace_updatable_float", 12, "title_s", "mytitle")); |
| updates.add( |
| regularUpdateRequest( |
| "id", 1, "inplace_updatable_float", map("inc", 1))); // delay indefinitely |
| updates.add(regularUpdateRequest("id", 1, "inplace_updatable_float", map("inc", 1))); |
| updates.add(regularDeleteByQueryRequest("inplace_updatable_float:14")); |
| |
| // The second request will be delayed very, very long, so that the next update actually gives up |
| // waiting for this and fetches a full update from the leader. |
| shardToJetty |
| .get(SHARD1) |
| .get(1) |
| .jetty |
| .getDebugFilter() |
| .addDelay("Waiting for dependant update to timeout", 2, 8000); |
| |
| ExecutorService threadpool = |
| ExecutorUtil.newMDCAwareFixedThreadPool( |
| updates.size() + 1, new SolrNamedThreadFactory(getTestName())); |
| for (UpdateRequest update : updates) { |
| AsyncUpdateWithRandomCommit task = |
| new AsyncUpdateWithRandomCommit(update, cloudClient, random().nextLong()); |
| threadpool.submit(task); |
| |
| // while we can't guarantee/trust what order the updates are executed in, since multiple |
| // threads are involved, but we're trying to bias the thread scheduling to run them in the |
| // order submitted |
| Thread.sleep(100); |
| } |
| |
| threadpool.shutdown(); |
| assertTrue( |
| "Thread pool didn't terminate within 12 secs", |
| threadpool.awaitTermination(12, TimeUnit.SECONDS)); |
| |
| commit(); |
| |
| // TODO: Could try checking ZK for LIR flags to ensure LIR has not kicked in |
| // Check every 10ms, 100 times, for a replica to go down (& assert that it doesn't) |
| for (int i = 0; i < 100; i++) { |
| Thread.sleep(10); |
| ZkStateReader.from(cloudClient).forceUpdateCollection(DEFAULT_COLLECTION); |
| ClusterState state = cloudClient.getClusterState(); |
| |
| int numActiveReplicas = 0; |
| for (Replica rep : state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas()) |
| if (rep.getState().equals(Replica.State.ACTIVE)) numActiveReplicas++; |
| |
| assertEquals( |
| "The replica receiving reordered updates must not have gone down", 3, numActiveReplicas); |
| } |
| |
| for (SolrClient client : clients) { |
| if (log.isInfoEnabled()) { |
| log.info( |
| "Testing client (testDBQUsingUpdatedFieldFromDroppedUpdate): {}", |
| ((HttpSolrClient) client).getBaseURL()); |
| log.info( |
| "Version at {} is: {}", |
| ((HttpSolrClient) client).getBaseURL(), |
| getReplicaValue(client, 1, "_version_")); // nowarn |
| } |
| assertNull(client.getById("1", params("distrib", "false"))); |
| } |
| |
| log.info("reorderedDBQsUsingUpdatedValueFromADroppedUpdate: This test passed fine..."); |
| } |
| |
| @Override |
| public void clearIndex() { |
| super.clearIndex(); |
| try { |
| for (SolrClient client : new SolrClient[] {LEADER, NONLEADERS.get(0), NONLEADERS.get(1)}) { |
| if (client != null) { |
| client.request(simulatedDeleteRequest("*:*", -Long.MAX_VALUE)); |
| client.commit(); |
| } |
| } |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |