| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.cloud; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| |
| import com.carrotsearch.randomizedtesting.annotations.Repeat; |
| import com.codahale.metrics.Meter; |
| import org.apache.http.HttpResponse; |
| import org.apache.http.client.ClientProtocolException; |
| import org.apache.http.client.HttpClient; |
| import org.apache.http.client.methods.HttpGet; |
| import org.apache.http.client.methods.HttpPost; |
| import org.apache.http.entity.StringEntity; |
| import org.apache.lucene.index.IndexWriter; |
| import org.apache.lucene.util.LuceneTestCase.Slow; |
| import org.apache.solr.client.solrj.SolrClient; |
| import org.apache.solr.client.solrj.SolrQuery; |
| import org.apache.solr.client.solrj.SolrServerException; |
| import org.apache.solr.client.solrj.embedded.JettySolrRunner; |
| import org.apache.solr.client.solrj.impl.CloudSolrClient; |
| import org.apache.solr.client.solrj.impl.HttpSolrClient; |
| import org.apache.solr.client.solrj.request.CollectionAdminRequest; |
| import org.apache.solr.client.solrj.request.QueryRequest; |
| import org.apache.solr.client.solrj.request.UpdateRequest; |
| import org.apache.solr.client.solrj.response.CollectionAdminResponse; |
| import org.apache.solr.client.solrj.response.QueryResponse; |
| import org.apache.solr.common.SolrDocument; |
| import org.apache.solr.common.SolrDocumentList; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.cloud.CollectionStatePredicate; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.Slice; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.common.params.CollectionParams; |
| import org.apache.solr.common.params.ModifiableSolrParams; |
| import org.apache.solr.common.util.NamedList; |
| import org.apache.solr.common.util.TimeSource; |
| import org.apache.solr.core.SolrCore; |
| import org.apache.solr.update.SolrIndexWriter; |
| import org.apache.solr.util.RefCounted; |
| import org.apache.solr.util.TestInjection; |
| import org.apache.solr.util.TimeOut; |
| import org.apache.zookeeper.KeeperException; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| @Slow |
| public class TestTlogReplica extends SolrCloudTestCase { |
| |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| private String collectionName = null; |
| private final static int REPLICATION_TIMEOUT_SECS = 10; |
| |
| private String suggestedCollectionName() { |
| return (getTestClass().getSimpleName().replace("Test", "") + "_" + getSaferTestName().split(" ")[0]).replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase(Locale.ROOT); |
| } |
| |
| @BeforeClass |
| public static void setupCluster() throws Exception { |
| System.setProperty("metricsEnabled", "true"); |
| System.setProperty("solr.waitToSeeReplicasInStateTimeoutSeconds", "30"); |
| configureCluster(2) // 2 + random().nextInt(3) |
| .addConfig("conf", configset("cloud-minimal-inplace-updates")) |
| .configure(); |
| Boolean useLegacyCloud = rarely(); |
| log.info("Using legacyCloud?: {}", useLegacyCloud); |
| CollectionAdminRequest.ClusterProp clusterPropRequest = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud)); |
| CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient()); |
| assertEquals(0, response.getStatus()); |
| } |
| |
| @AfterClass |
| public static void tearDownCluster() { |
| TestInjection.reset(); |
| } |
| |
| @Override |
| public void setUp() throws Exception { |
| super.setUp(); |
| collectionName = suggestedCollectionName(); |
| expectThrows(SolrException.class, () -> getCollectionState(collectionName)); |
| } |
| |
| @Override |
| public void tearDown() throws Exception { |
| for (JettySolrRunner jetty:cluster.getJettySolrRunners()) { |
| if (!jetty.isRunning()) { |
| log.warn("Jetty {} not running, probably some bad test. Starting it", jetty.getLocalPort()); |
| jetty.start(); |
| } |
| } |
| if (cluster.getSolrClient().getZkStateReader().getClusterState().getCollectionOrNull(collectionName) != null) { |
| log.info("tearDown deleting collection"); |
| CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient()); |
| waitForDeletion(collectionName); |
| } |
| super.tearDown(); |
| } |
| |
| /** |
| * Asserts that Update logs exist for replicas of type {@link org.apache.solr.common.cloud.Replica.Type#NRT}, but not |
| * for replicas of type {@link org.apache.solr.common.cloud.Replica.Type#PULL} |
| */ |
| private void assertUlogPresence(DocCollection collection) { |
| for (Slice s:collection.getSlices()) { |
| for (Replica r:s.getReplicas()) { |
| SolrCore core = null; |
| try { |
| core = cluster.getReplicaJetty(r).getCoreContainer().getCore(r.getCoreName()); |
| assertNotNull(core); |
| assertTrue("Update log should exist for replicas of type Append", |
| new java.io.File(core.getUlogDir()).exists()); |
| } finally { |
| core.close(); |
| } |
| } |
| } |
| } |
| |
| @Repeat(iterations=2) // 2 times to make sure cleanup is complete and we can create the same collection |
| public void testCreateDelete() throws Exception { |
| switch (random().nextInt(3)) { |
| case 0: |
| CollectionAdminRequest.createCollection(collectionName, "conf", 2, 0, 4, 0) |
| .setMaxShardsPerNode(100) |
| .process(cluster.getSolrClient()); |
| cluster.waitForActiveCollection(collectionName, 2, 8); |
| break; |
| case 1: |
| // Sometimes don't use SolrJ |
| String url = String.format(Locale.ROOT, "%s/admin/collections?action=CREATE&name=%s&collection.configName=%s&numShards=%s&tlogReplicas=%s&maxShardsPerNode=%s", |
| cluster.getRandomJetty(random()).getBaseUrl(), |
| collectionName, "conf", |
| 2, // numShards |
| 4, // tlogReplicas |
| 100); // maxShardsPerNode |
| HttpGet createCollectionGet = new HttpGet(url); |
| HttpResponse httpResponse = cluster.getSolrClient().getHttpClient().execute(createCollectionGet); |
| assertEquals(200, httpResponse.getStatusLine().getStatusCode()); |
| cluster.waitForActiveCollection(collectionName, 2, 8); |
| break; |
| case 2: |
| // Sometimes use V2 API |
| url = cluster.getRandomJetty(random()).getBaseUrl().toString() + "/____v2/c"; |
| String requestBody = String.format(Locale.ROOT, "{create:{name:%s, config:%s, numShards:%s, tlogReplicas:%s, maxShardsPerNode:%s}}", |
| collectionName, "conf", |
| 2, // numShards |
| 4, // tlogReplicas |
| 100); // maxShardsPerNode |
| HttpPost createCollectionPost = new HttpPost(url); |
| createCollectionPost.setHeader("Content-type", "application/json"); |
| createCollectionPost.setEntity(new StringEntity(requestBody)); |
| httpResponse = cluster.getSolrClient().getHttpClient().execute(createCollectionPost); |
| assertEquals(200, httpResponse.getStatusLine().getStatusCode()); |
| cluster.waitForActiveCollection(collectionName, 2, 8); |
| break; |
| } |
| |
| boolean reloaded = false; |
| while (true) { |
| DocCollection docCollection = getCollectionState(collectionName); |
| assertNotNull(docCollection); |
| assertEquals("Expecting 2 shards", |
| 2, docCollection.getSlices().size()); |
| assertEquals("Expecting 4 relpicas per shard", |
| 8, docCollection.getReplicas().size()); |
| assertEquals("Expecting 8 tlog replicas, 4 per shard", |
| 8, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size()); |
| assertEquals("Expecting no nrt replicas", |
| 0, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size()); |
| assertEquals("Expecting no pull replicas", |
| 0, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size()); |
| for (Slice s:docCollection.getSlices()) { |
| assertTrue(s.getLeader().getType() == Replica.Type.TLOG); |
| List<String> shardElectionNodes = cluster.getZkClient().getChildren(ZkStateReader.getShardLeadersElectPath(collectionName, s.getName()), null, true); |
| assertEquals("Unexpected election nodes for Shard: " + s.getName() + ": " + Arrays.toString(shardElectionNodes.toArray()), |
| 4, shardElectionNodes.size()); |
| } |
| assertUlogPresence(docCollection); |
| if (reloaded) { |
| break; |
| } else { |
| // reload |
| CollectionAdminResponse response = CollectionAdminRequest.reloadCollection(collectionName) |
| .process(cluster.getSolrClient()); |
| assertEquals(0, response.getStatus()); |
| waitForState("failed waiting for active colletion", collectionName, clusterShape(2, 8)); |
| reloaded = true; |
| } |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| public void testAddDocs() throws Exception { |
| int numTlogReplicas = 1 + random().nextInt(3); |
| DocCollection docCollection = createAndWaitForCollection(1, 0, numTlogReplicas, 0); |
| assertEquals(1, docCollection.getSlices().size()); |
| |
| cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar")); |
| cluster.getSolrClient().commit(collectionName); |
| |
| Slice s = docCollection.getSlices().iterator().next(); |
| try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) { |
| assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound()); |
| } |
| |
| TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| for (Replica r:s.getReplicas(EnumSet.of(Replica.Type.TLOG))) { |
| //TODO: assert replication < REPLICATION_TIMEOUT_SECS |
| try (HttpSolrClient tlogReplicaClient = getHttpSolrClient(r.getCoreUrl())) { |
| while (true) { |
| try { |
| assertEquals("Replica " + r.getName() + " not up to date after 10 seconds", |
| 1, tlogReplicaClient.query(new SolrQuery("*:*")).getResults().getNumFound()); |
| // Append replicas process all updates |
| SolrQuery req = new SolrQuery( |
| "qt", "/admin/plugins", |
| "stats", "true"); |
| QueryResponse statsResponse = tlogReplicaClient.query(req); |
| assertEquals("Append replicas should recive all updates. Replica: " + r + ", response: " + statsResponse, |
| 1L, ((Map<String, Object>)(statsResponse.getResponse()).findRecursive("plugins", "UPDATE", "updateHandler", "stats")).get("UPDATE.updateHandler.cumulativeAdds.count")); |
| break; |
| } catch (AssertionError e) { |
| if (t.hasTimedOut()) { |
| throw e; |
| } else { |
| Thread.sleep(100); |
| } |
| } |
| } |
| } |
| } |
| assertUlogPresence(docCollection); |
| } |
| |
| public void testAddRemoveTlogReplica() throws Exception { |
| DocCollection docCollection = createAndWaitForCollection(2, 0, 1, 0); |
| assertEquals(2, docCollection.getSlices().size()); |
| |
| addReplicaToShard("shard1", Replica.Type.TLOG); |
| docCollection = assertNumberOfReplicas(0, 3, 0, true, false); |
| addReplicaToShard("shard2", Replica.Type.TLOG); |
| docCollection = assertNumberOfReplicas(0, 4, 0, true, false); |
| |
| waitForState("Expecting collection to have 2 shards and 2 replica each", collectionName, clusterShape(2, 4)); |
| |
| //Delete tlog replica from shard1 |
| CollectionAdminRequest.deleteReplica( |
| collectionName, |
| "shard1", |
| docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.TLOG)).get(0).getName()) |
| .process(cluster.getSolrClient()); |
| assertNumberOfReplicas(0, 3, 0, true, true); |
| } |
| |
| private void addReplicaToShard(String shardName, Replica.Type type) throws ClientProtocolException, IOException, SolrServerException { |
| switch (random().nextInt(3)) { |
| case 0: // Add replica with SolrJ |
| CollectionAdminResponse response = CollectionAdminRequest.addReplicaToShard(collectionName, shardName, type).process(cluster.getSolrClient()); |
| assertEquals("Unexpected response status: " + response.getStatus(), 0, response.getStatus()); |
| break; |
| case 1: // Add replica with V1 API |
| String url = String.format(Locale.ROOT, "%s/admin/collections?action=ADDREPLICA&collection=%s&shard=%s&type=%s", |
| cluster.getRandomJetty(random()).getBaseUrl(), |
| collectionName, |
| shardName, |
| type); |
| HttpGet addReplicaGet = new HttpGet(url); |
| HttpResponse httpResponse = cluster.getSolrClient().getHttpClient().execute(addReplicaGet); |
| assertEquals(200, httpResponse.getStatusLine().getStatusCode()); |
| break; |
| case 2:// Add replica with V2 API |
| url = String.format(Locale.ROOT, "%s/____v2/c/%s/shards", |
| cluster.getRandomJetty(random()).getBaseUrl(), |
| collectionName); |
| String requestBody = String.format(Locale.ROOT, "{add-replica:{shard:%s, type:%s}}", |
| shardName, |
| type); |
| HttpPost addReplicaPost = new HttpPost(url); |
| addReplicaPost.setHeader("Content-type", "application/json"); |
| addReplicaPost.setEntity(new StringEntity(requestBody)); |
| httpResponse = cluster.getSolrClient().getHttpClient().execute(addReplicaPost); |
| assertEquals(200, httpResponse.getStatusLine().getStatusCode()); |
| break; |
| } |
| } |
| |
| public void testRemoveLeader() throws Exception { |
| doReplaceLeader(true); |
| } |
| |
| public void testKillLeader() throws Exception { |
| doReplaceLeader(false); |
| } |
| |
| public void testRealTimeGet() throws SolrServerException, IOException, KeeperException, InterruptedException { |
| // should be redirected to Replica.Type.REALTIME |
| int numReplicas = random().nextBoolean()?1:2; |
| int numNrtReplicas = random().nextBoolean()?0:2; |
| CollectionAdminRequest.createCollection(collectionName, "conf", 1, numNrtReplicas, numReplicas, 0) |
| .setMaxShardsPerNode(100) |
| .process(cluster.getSolrClient()); |
| waitForState("Unexpected replica count", collectionName, activeReplicaCount(numNrtReplicas, numReplicas, 0)); |
| DocCollection docCollection = assertNumberOfReplicas(numNrtReplicas, numReplicas, 0, false, true); |
| HttpClient httpClient = cluster.getSolrClient().getHttpClient(); |
| int id = 0; |
| Slice slice = docCollection.getSlice("shard1"); |
| List<String> ids = new ArrayList<>(slice.getReplicas().size()); |
| for (Replica rAdd:slice.getReplicas()) { |
| try (HttpSolrClient client = getHttpSolrClient(rAdd.getCoreUrl(), httpClient)) { |
| client.add(new SolrInputDocument("id", String.valueOf(id), "foo_s", "bar")); |
| } |
| SolrDocument docCloudClient = cluster.getSolrClient().getById(collectionName, String.valueOf(id)); |
| assertNotNull(docCloudClient); |
| assertEquals("bar", docCloudClient.getFieldValue("foo_s")); |
| for (Replica rGet:slice.getReplicas()) { |
| try (HttpSolrClient client = getHttpSolrClient(rGet.getCoreUrl(), httpClient)) { |
| SolrDocument doc = client.getById(String.valueOf(id)); |
| assertEquals("bar", doc.getFieldValue("foo_s")); |
| } |
| } |
| ids.add(String.valueOf(id)); |
| id++; |
| } |
| SolrDocumentList previousAllIdsResult = null; |
| for (Replica rAdd:slice.getReplicas()) { |
| try (HttpSolrClient client = getHttpSolrClient(rAdd.getCoreUrl(), httpClient)) { |
| SolrDocumentList allIdsResult = client.getById(ids); |
| if (previousAllIdsResult != null) { |
| assertTrue(compareSolrDocumentList(previousAllIdsResult, allIdsResult)); |
| } else { |
| // set the first response here |
| previousAllIdsResult = allIdsResult; |
| assertEquals("Unexpected number of documents", ids.size(), allIdsResult.getNumFound()); |
| } |
| } |
| id++; |
| } |
| } |
| |
| /* |
| * validate leader election and that replication still happens on a new leader |
| */ |
| private void doReplaceLeader(boolean removeReplica) throws Exception { |
| DocCollection docCollection = createAndWaitForCollection(1, 0, 2, 0); |
| |
| // Add a document and commit |
| cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar")); |
| cluster.getSolrClient().commit(collectionName); |
| Slice s = docCollection.getSlices().iterator().next(); |
| try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) { |
| assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound()); |
| } |
| |
| waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)), REPLICATION_TIMEOUT_SECS); |
| |
| // Delete leader replica from shard1 |
| JettySolrRunner leaderJetty = null; |
| if (removeReplica) { |
| CollectionAdminRequest.deleteReplica( |
| collectionName, |
| "shard1", |
| s.getLeader().getName()) |
| .process(cluster.getSolrClient()); |
| } else { |
| leaderJetty = cluster.getReplicaJetty(s.getLeader()); |
| leaderJetty.stop(); |
| waitForState("Leader replica not removed", collectionName, clusterShape(1, 1)); |
| // Wait for cluster state to be updated |
| waitForState("Replica state not updated in cluster state", |
| collectionName, clusterStateReflectsActiveAndDownReplicas()); |
| } |
| docCollection = assertNumberOfReplicas(0, 1, 0, true, true); |
| |
| // Wait until a new leader is elected |
| waitForLeaderChange(leaderJetty, "shard1"); |
| |
| // There is a new leader, I should be able to add and commit |
| cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo")); |
| cluster.getSolrClient().commit(collectionName); |
| |
| // Queries should still work |
| waitForNumDocsInAllReplicas(2, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)), REPLICATION_TIMEOUT_SECS); |
| // Start back the node |
| if (removeReplica) { |
| addReplicaWithRetries(); |
| |
| } else { |
| leaderJetty.start(); |
| } |
| waitForState("Expected collection to be 1x2", collectionName, clusterShape(1, 2)); |
| // added replica should replicate from the leader |
| waitForNumDocsInAllReplicas(2, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)), REPLICATION_TIMEOUT_SECS); |
| } |
| |
| private void addReplicaWithRetries() throws SolrServerException, IOException { |
| int maxAttempts = 3; |
| for (int i = 0; i < maxAttempts ; i++) { |
| try { |
| CollectionAdminResponse respone = CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.TLOG).process(cluster.getSolrClient()); |
| // This is an unfortunate hack. There are cases where the ADDREPLICA fails, will create a Jira to address that separately. for now, we'll retry |
| if (respone.isSuccess()) { |
| break; |
| } |
| log.error("Unsuccessful attempt to add replica. Attempt: {}/{}", i, maxAttempts); |
| } catch (SolrException e) { |
| log.error("Exception while adding replica. Attempt: {}/{}", i, maxAttempts, e); |
| } |
| } |
| } |
| |
| @Ignore |
| public void testKillTlogReplica() throws Exception { |
| DocCollection docCollection = createAndWaitForCollection(1, 0, 2, 0); |
| |
| waitForNumDocsInAllActiveReplicas(0); |
| cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar")); |
| cluster.getSolrClient().commit(collectionName); |
| waitForNumDocsInAllActiveReplicas(1); |
| |
| JettySolrRunner pullReplicaJetty = cluster.getReplicaJetty(docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.TLOG)).get(0)); |
| pullReplicaJetty.stop(); |
| waitForState("Replica not removed", collectionName, activeReplicaCount(0, 1, 0)); |
| waitForLeaderChange(pullReplicaJetty, "shard1"); |
| // // Also wait for the replica to be placed in state="down" |
| // waitForState("Didn't update state", collectionName, clusterStateReflectsActiveAndDownReplicas()); |
| |
| cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "bar")); |
| cluster.getSolrClient().commit(collectionName); |
| waitForNumDocsInAllActiveReplicas(2); |
| |
| pullReplicaJetty.start(); |
| waitForState("Replica not added", collectionName, activeReplicaCount(0, 2, 0)); |
| waitForNumDocsInAllActiveReplicas(2); |
| } |
| |
| @Test |
| public void testOnlyLeaderIndexes() throws Exception { |
| createAndWaitForCollection(1, 0, 2, 0); |
| |
| CloudSolrClient cloudClient = cluster.getSolrClient(); |
| new UpdateRequest() |
| .add(sdoc("id", "1")) |
| .add(sdoc("id", "2")) |
| .add(sdoc("id", "3")) |
| .add(sdoc("id", "4")) |
| .process(cloudClient, collectionName); |
| |
| { |
| long docsPending = (long) getSolrCore(true).get(0).getMetricRegistry().getGauges().get("UPDATE.updateHandler.docsPending").getValue(); |
| assertEquals("Expected 4 docs are pending in core " + getSolrCore(true).get(0).getCoreDescriptor(),4, docsPending); |
| } |
| |
| for (SolrCore solrCore : getSolrCore(false)) { |
| long docsPending = (long) solrCore.getMetricRegistry().getGauges().get("UPDATE.updateHandler.docsPending").getValue(); |
| assertEquals("Expected non docs are pending in core " + solrCore.getCoreDescriptor(),0, docsPending); |
| } |
| |
| checkRTG(1, 4, cluster.getJettySolrRunners()); |
| |
| new UpdateRequest() |
| .deleteById("1") |
| .deleteByQuery("id:2") |
| .process(cloudClient, collectionName); |
| |
| // The DBQ is not processed at replicas, so we still can get doc2 and other docs by RTG |
| checkRTG(2,4, getSolrRunner(false)); |
| |
| Map<SolrCore, Long> timeCopyOverPerCores = getTimesCopyOverOldUpdates(getSolrCore(false)); |
| new UpdateRequest() |
| .commit(cloudClient, collectionName); |
| |
| waitForNumDocsInAllActiveReplicas(2); |
| // There are a small delay between new searcher and copy over old updates operation |
| TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| while (!timeOut.hasTimedOut()) { |
| if (assertCopyOverOldUpdates(1, timeCopyOverPerCores)) { |
| break; |
| } else { |
| Thread.sleep(500); |
| } |
| } |
| assertTrue("Expect only one copy over updates per cores", assertCopyOverOldUpdates(1, timeCopyOverPerCores)); |
| |
| boolean firstCommit = true; |
| // UpdateLog copy over old updates |
| for (int i = 15; i <= 150; i++) { |
| cloudClient.add(collectionName, sdoc("id",String.valueOf(i))); |
| if (random().nextInt(100) < 15 & i != 150) { |
| if (firstCommit) { |
| // because tlog replicas periodically ask leader for new segments, |
| // therefore the copy over old updates action must not be triggered until |
| // tlog replicas actually get new segments |
| assertTrue("Expect only one copy over updates per cores", assertCopyOverOldUpdates(1, timeCopyOverPerCores)); |
| firstCommit = false; |
| } |
| cloudClient.commit(collectionName); |
| } |
| } |
| checkRTG(120,150, cluster.getJettySolrRunners()); |
| waitForReplicasCatchUp(4 * REPLICATION_TIMEOUT_SECS); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public void testRecovery() throws Exception { |
| createAndWaitForCollection(1, 0, 2, 0); |
| |
| CloudSolrClient cloudClient = cluster.getSolrClient(); |
| new UpdateRequest() |
| .add(sdoc("id", "3")) |
| .add(sdoc("id", "4")) |
| .commit(cloudClient, collectionName); |
| new UpdateRequest() |
| .add(sdoc("id", "5")) |
| .process(cloudClient, collectionName); |
| JettySolrRunner solrRunner = getSolrRunner(false).get(0); |
| solrRunner.stop(); |
| waitForState("Replica still up", collectionName, activeReplicaCount(0,1,0)); |
| new UpdateRequest() |
| .add(sdoc("id", "6")) |
| .process(cloudClient, collectionName); |
| solrRunner.start(); |
| waitForState("Replica didn't recover", collectionName, activeReplicaCount(0,2,0)); |
| // We skip peerSync, so replica will always trigger commit on leader |
| // We query only the non-leader replicas, since we haven't opened a new searcher on the leader yet |
| waitForNumDocsInAllReplicas(4, getNonLeaderReplias(collectionName), 10); //timeout for stale collection state |
| |
| // If I add the doc immediately, the leader fails to communicate with the follower with broken pipe. |
| // Options are, wait or retry... |
| for (int i = 0; i < 3; i++) { |
| UpdateRequest ureq = new UpdateRequest().add(sdoc("id", "7")); |
| ureq.setParam("collection", collectionName); |
| NamedList<Object> response = cloudClient.request(ureq); |
| if ((Integer)((NamedList<Object>)response.get("responseHeader")).get(UpdateRequest.REPFACT) >= 2) { |
| break; |
| } |
| log.info("Min RF not achieved yet. retrying"); |
| } |
| checkRTG(3,7, cluster.getJettySolrRunners()); |
| try { |
| TestInjection.skipIndexWriterCommitOnClose = true; |
| solrRunner.stop(); |
| waitForState("Replica still up", collectionName, activeReplicaCount(0,1,0)); |
| } finally { |
| TestInjection.skipIndexWriterCommitOnClose = false; |
| } |
| solrRunner.start(); |
| waitForState("Replica didn't recover", collectionName, activeReplicaCount(0,2,0)); |
| waitForNumDocsInAllReplicas(5, getNonLeaderReplias(collectionName), 10); //timeout for stale collection state |
| checkRTG(3,7, cluster.getJettySolrRunners()); |
| cluster.getSolrClient().commit(collectionName); |
| |
| // Test replica recovery apply buffer updates |
| Semaphore waitingForBufferUpdates = new Semaphore(0); |
| Semaphore waitingForReplay = new Semaphore(0); |
| RecoveryStrategy.testing_beforeReplayBufferingUpdates = () -> { |
| try { |
| waitingForReplay.release(); |
| waitingForBufferUpdates.acquire(); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| fail("Test interrupted: " + e.getMessage()); |
| } |
| }; |
| solrRunner.stop(); |
| solrRunner.start(); |
| waitingForReplay.acquire(); |
| // If I add the doc immediately, the leader fails to communicate with the follower with broken pipe. |
| // Options are, wait or retry... |
| for (int i = 0; i < 3; i++) { |
| UpdateRequest ureq = new UpdateRequest().add(sdoc("id", "8")); |
| ureq.setParam("collection", collectionName); |
| NamedList<Object> response = cloudClient.request(ureq); |
| if ((Integer)((NamedList<Object>)response.get("responseHeader")).get(UpdateRequest.REPFACT) >= 2) { |
| break; |
| } |
| log.info("Min RF not achieved yet. retrying"); |
| } |
| new UpdateRequest() |
| .add(sdoc("id", "9")) |
| .add(sdoc("id", "10")) |
| .process(cloudClient, collectionName); |
| waitingForBufferUpdates.release(); |
| RecoveryStrategy.testing_beforeReplayBufferingUpdates = null; |
| waitForState("Replica didn't recover", collectionName, activeReplicaCount(0,2,0)); |
| checkRTG(3,10, cluster.getJettySolrRunners()); |
| for (SolrCore solrCore : getSolrCore(false)) { |
| RefCounted<IndexWriter> iwRef = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null); |
| assertFalse("IndexWriter at replicas must not see updates ", iwRef.get().hasUncommittedChanges()); |
| iwRef.decref(); |
| } |
| } |
| |
| private List<Replica> getNonLeaderReplias(String collectionName) { |
| return getCollectionState(collectionName).getReplicas().stream().filter( |
| (r)-> !r.getBool("leader", false)).collect(Collectors.toList()); |
| } |
| |
| public void testDeleteById() throws Exception{ |
| createAndWaitForCollection(1,0,2,0); |
| CloudSolrClient cloudClient = cluster.getSolrClient(); |
| new UpdateRequest() |
| .deleteByQuery("*:*") |
| .commit(cluster.getSolrClient(), collectionName); |
| new UpdateRequest() |
| .add(sdoc("id", "1")) |
| .commit(cloudClient, collectionName); |
| waitForNumDocsInAllActiveReplicas(1); |
| new UpdateRequest() |
| .deleteById("1") |
| .process(cloudClient, collectionName); |
| boolean successs = false; |
| try { |
| checkRTG(1, 1, cluster.getJettySolrRunners()); |
| successs = true; |
| } catch (AssertionError e) { |
| //expected |
| } |
| assertFalse("Doc1 is deleted but it's still exist", successs); |
| } |
| |
| public void testBasicLeaderElection() throws Exception { |
| createAndWaitForCollection(1,0,2,0); |
| CloudSolrClient cloudClient = cluster.getSolrClient(); |
| new UpdateRequest() |
| .deleteByQuery("*:*") |
| .commit(cluster.getSolrClient(), collectionName); |
| new UpdateRequest() |
| .add(sdoc("id", "1")) |
| .add(sdoc("id", "2")) |
| .process(cloudClient, collectionName); |
| JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0); |
| oldLeaderJetty.stop(); |
| waitForState("Replica not removed", collectionName, activeReplicaCount(0, 1, 0)); |
| |
| // Even after the replica is gone, a leader may not be elected yet. Wait for it. |
| waitForLeaderChange(oldLeaderJetty, "shard1"); |
| |
| new UpdateRequest() |
| .add(sdoc("id", "3")) |
| .add(sdoc("id", "4")) |
| .process(cloudClient, collectionName); |
| oldLeaderJetty.start(); |
| waitForState("Replica not added", collectionName, activeReplicaCount(0, 2, 0)); |
| checkRTG(1,4, cluster.getJettySolrRunners()); |
| new UpdateRequest() |
| .commit(cloudClient, collectionName); |
| waitForNumDocsInAllActiveReplicas(4, 0); |
| } |
| public void testRebalanceLeaders() throws Exception { |
| createAndWaitForCollection(1,0,2,0); |
| CloudSolrClient cloudClient = cluster.getSolrClient(); |
| new UpdateRequest() |
| .deleteByQuery("*:*") |
| .commit(cluster.getSolrClient(), collectionName); |
| |
| // Find a replica which isn't leader |
| DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName); |
| Slice slice = docCollection.getSlices().iterator().next(); |
| Replica newLeader = null; |
| for (Replica replica : slice.getReplicas()) { |
| if (slice.getLeader() == replica) continue; |
| newLeader = replica; |
| break; |
| } |
| assertNotNull("Failed to find a candidate of new leader", newLeader); |
| |
| // Set preferredLeader flag to the replica |
| ModifiableSolrParams params = new ModifiableSolrParams(); |
| params.set("action", CollectionParams.CollectionAction.ADDREPLICAPROP.toString()); |
| params.set("collection", collectionName); |
| params.set("shard", slice.getName()); |
| params.set("replica", newLeader.getName()); |
| params.set("property", "preferredLeader"); |
| params.set("property.value", "true"); |
| QueryRequest request = new QueryRequest(params); |
| request.setPath("/admin/collections"); |
| cloudClient.request(request); |
| |
| // Wait until a preferredleader flag is set to the new leader candidate |
| TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| while (!timeout.hasTimedOut()) { |
| Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName).getSlicesMap(); |
| Replica me = slices.get(slice.getName()).getReplica(newLeader.getName()); |
| if (me.getBool("property.preferredleader", false)) { |
| break; |
| } |
| Thread.sleep(100); |
| } |
| assertFalse("Timeout waiting for setting preferredleader flag", timeout.hasTimedOut()); |
| |
| // Rebalance leaders |
| params = new ModifiableSolrParams(); |
| params.set("action", CollectionParams.CollectionAction.REBALANCELEADERS.toString()); |
| params.set("collection", collectionName); |
| params.set("maxAtOnce", "10"); |
| request = new QueryRequest(params); |
| request.setPath("/admin/collections"); |
| cloudClient.request(request); |
| |
| // Wait until a new leader is elected |
| timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| while (!timeout.hasTimedOut()) { |
| docCollection = getCollectionState(collectionName); |
| Replica leader = docCollection.getSlice(slice.getName()).getLeader(); |
| if (leader != null && leader.getName().equals(newLeader.getName()) && |
| leader.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes())) { |
| break; |
| } |
| Thread.sleep(100); |
| } |
| assertFalse("Timeout waiting for a new leader to be elected", timeout.hasTimedOut()); |
| |
| new UpdateRequest() |
| .add(sdoc("id", "1")) |
| .add(sdoc("id", "2")) |
| .add(sdoc("id", "3")) |
| .add(sdoc("id", "4")) |
| .process(cloudClient, collectionName); |
| checkRTG(1,4, cluster.getJettySolrRunners()); |
| new UpdateRequest() |
| .commit(cloudClient, collectionName); |
| waitForNumDocsInAllActiveReplicas(4); |
| } |
| private void waitForLeaderChange(JettySolrRunner oldLeaderJetty, String shardName) { |
| waitForState("Expect new leader", collectionName, |
| (liveNodes, collectionState) -> { |
| Replica leader = collectionState.getLeader(shardName); |
| if (leader == null || !leader.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes())) { |
| return false; |
| } |
| return oldLeaderJetty == null || !leader.getNodeName().equals(oldLeaderJetty.getNodeName()); |
| } |
| ); |
| } |
| |
| public void testOutOfOrderDBQWithInPlaceUpdates() throws Exception { |
| createAndWaitForCollection(1,0,2,0); |
| assertFalse(getSolrCore(true).get(0).getLatestSchema().getField("inplace_updatable_int").indexed()); |
| assertFalse(getSolrCore(true).get(0).getLatestSchema().getField("inplace_updatable_int").stored()); |
| assertTrue(getSolrCore(true).get(0).getLatestSchema().getField("inplace_updatable_int").hasDocValues()); |
| List<UpdateRequest> updates = new ArrayList<>(); |
| updates.add(simulatedUpdateRequest(null, "id", 1, "title_s", "title0_new", "inplace_updatable_int", 5, "_version_", 1L)); // full update |
| updates.add(simulatedDBQ("inplace_updatable_int:5", 3L)); |
| updates.add(simulatedUpdateRequest(1L, "id", 1, "inplace_updatable_int", 6, "_version_", 2L)); |
| for (JettySolrRunner solrRunner: getSolrRunner(false)) { |
| try (SolrClient client = solrRunner.newClient()) { |
| for (UpdateRequest up : updates) { |
| up.process(client, collectionName); |
| } |
| } |
| } |
| JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0); |
| oldLeaderJetty.stop(); |
| waitForState("Replica not removed", collectionName, activeReplicaCount(0, 1, 0)); |
| waitForLeaderChange(oldLeaderJetty, "shard1"); |
| oldLeaderJetty.start(); |
| waitForState("Replica not added", collectionName, activeReplicaCount(0, 2, 0)); |
| checkRTG(1,1, cluster.getJettySolrRunners()); |
| SolrDocument doc = cluster.getSolrClient().getById(collectionName,"1"); |
| assertNotNull(doc.get("title_s")); |
| } |
| |
| private UpdateRequest simulatedUpdateRequest(Long prevVersion, Object... fields) throws SolrServerException, IOException { |
| SolrInputDocument doc = sdoc(fields); |
| |
| // get baseUrl of the leader |
| String baseUrl = getBaseUrl(); |
| |
| UpdateRequest ur = new UpdateRequest(); |
| ur.add(doc); |
| ur.setParam("update.distrib", "FROMLEADER"); |
| if (prevVersion != null) { |
| ur.setParam("distrib.inplace.prevversion", String.valueOf(prevVersion)); |
| ur.setParam("distrib.inplace.update", "true"); |
| } |
| ur.setParam("distrib.from", baseUrl); |
| return ur; |
| } |
| |
| private UpdateRequest simulatedDBQ(String query, long version) throws SolrServerException, IOException { |
| String baseUrl = getBaseUrl(); |
| |
| UpdateRequest ur = new UpdateRequest(); |
| ur.deleteByQuery(query); |
| ur.setParam("_version_", ""+version); |
| ur.setParam("update.distrib", "FROMLEADER"); |
| ur.setParam("distrib.from", baseUrl); |
| return ur; |
| } |
| |
| private String getBaseUrl() { |
| DocCollection collection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(collectionName); |
| Slice slice = collection.getSlice("shard1"); |
| return slice.getLeader().getCoreUrl(); |
| } |
| |
| private DocCollection createAndWaitForCollection(int numShards, int numNrtReplicas, int numTlogReplicas, int numPullReplicas) throws SolrServerException, IOException, KeeperException, InterruptedException { |
| CollectionAdminRequest.createCollection(collectionName, "conf", numShards, numNrtReplicas, numTlogReplicas, numPullReplicas) |
| .setMaxShardsPerNode(100) |
| .process(cluster.getSolrClient()); |
| int numReplicasPerShard = numNrtReplicas + numTlogReplicas + numPullReplicas; |
| waitForState("Expected collection to be created with " + numShards + " shards and " + numReplicasPerShard + " replicas", |
| collectionName, clusterShape(numShards, numShards * numReplicasPerShard)); |
| return assertNumberOfReplicas(numNrtReplicas*numShards, numTlogReplicas*numShards, numPullReplicas*numShards, false, true); |
| } |
| |
| private void waitForNumDocsInAllActiveReplicas(int numDocs) throws IOException, SolrServerException, InterruptedException { |
| waitForNumDocsInAllActiveReplicas(numDocs, REPLICATION_TIMEOUT_SECS); |
| } |
| |
| private void waitForNumDocsInAllActiveReplicas(int numDocs, int timeout) throws IOException, SolrServerException, InterruptedException { |
| DocCollection docCollection = getCollectionState(collectionName); |
| waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> r.getState() == Replica.State.ACTIVE).collect(Collectors.toList()), timeout); |
| } |
| |
| private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas, int timeout) throws IOException, SolrServerException, InterruptedException { |
| waitForNumDocsInAllReplicas(numDocs, replicas, "*:*", timeout); |
| } |
| |
| private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas, String query, int timeout) throws IOException, SolrServerException, InterruptedException { |
| TimeOut t = new TimeOut(timeout, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| for (Replica r:replicas) { |
| if (!r.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes())) { |
| continue; |
| } |
| try (HttpSolrClient replicaClient = getHttpSolrClient(r.getCoreUrl())) { |
| while (true) { |
| try { |
| assertEquals("Replica " + r.getName() + " not up to date after " + timeout + " seconds", |
| numDocs, replicaClient.query(new SolrQuery(query)).getResults().getNumFound()); |
| break; |
| } catch (AssertionError e) { |
| if (t.hasTimedOut()) { |
| throw e; |
| } else { |
| Thread.sleep(100); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| private void waitForDeletion(String collection) throws InterruptedException, KeeperException { |
| TimeOut t = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| while (cluster.getSolrClient().getZkStateReader().getClusterState().hasCollection(collection)) { |
| try { |
| Thread.sleep(100); |
| if (t.hasTimedOut()) { |
| fail("Timed out waiting for collection " + collection + " to be deleted."); |
| } |
| cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collection); |
| } catch(SolrException e) { |
| return; |
| } |
| |
| } |
| } |
| |
| private DocCollection assertNumberOfReplicas(int numNrtReplicas, int numTlogReplicas, int numPullReplicas, boolean updateCollection, boolean activeOnly) throws KeeperException, InterruptedException { |
| if (updateCollection) { |
| cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName); |
| } |
| DocCollection docCollection = getCollectionState(collectionName); |
| assertNotNull(docCollection); |
| assertEquals("Unexpected number of nrt replicas: " + docCollection, numNrtReplicas, |
| docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count()); |
| assertEquals("Unexpected number of pull replicas: " + docCollection, numPullReplicas, |
| docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count()); |
| assertEquals("Unexpected number of tlog replicas: " + docCollection, numTlogReplicas, |
| docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count()); |
| return docCollection; |
| } |
| |
| /* |
| * passes only if all replicas are active or down, and the "liveNodes" reflect the same status |
| */ |
| private CollectionStatePredicate clusterStateReflectsActiveAndDownReplicas() { |
| return (liveNodes, collectionState) -> { |
| for (Replica r:collectionState.getReplicas()) { |
| if (r.getState() != Replica.State.DOWN && r.getState() != Replica.State.ACTIVE) { |
| return false; |
| } |
| if (r.getState() == Replica.State.DOWN && liveNodes.contains(r.getNodeName())) { |
| return false; |
| } |
| if (r.getState() == Replica.State.ACTIVE && !liveNodes.contains(r.getNodeName())) { |
| return false; |
| } |
| } |
| return true; |
| }; |
| } |
| |
| |
| private CollectionStatePredicate activeReplicaCount(int numNrtReplicas, int numTlogReplicas, int numPullReplicas) { |
| return (liveNodes, collectionState) -> { |
| int nrtFound = 0, tlogFound = 0, pullFound = 0; |
| if (collectionState == null) |
| return false; |
| for (Slice slice : collectionState) { |
| for (Replica replica : slice) { |
| if (replica.isActive(liveNodes)) |
| switch (replica.getType()) { |
| case TLOG: |
| tlogFound++; |
| break; |
| case PULL: |
| pullFound++; |
| break; |
| case NRT: |
| nrtFound++; |
| break; |
| default: |
| throw new AssertionError("Unexpected replica type"); |
| } |
| } |
| } |
| return numNrtReplicas == nrtFound && numTlogReplicas == tlogFound && numPullReplicas == pullFound; |
| }; |
| } |
| |
| private List<SolrCore> getSolrCore(boolean isLeader) { |
| List<SolrCore> rs = new ArrayList<>(); |
| |
| CloudSolrClient cloudClient = cluster.getSolrClient(); |
| DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName); |
| |
| for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) { |
| if (solrRunner.getCoreContainer() == null) continue; |
| for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) { |
| CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor(); |
| Slice slice = docCollection.getSlice(cloudDescriptor.getShardId()); |
| Replica replica = docCollection.getReplica(cloudDescriptor.getCoreNodeName()); |
| if (slice.getLeader().equals(replica) && isLeader) { |
| rs.add(solrCore); |
| } else if (!slice.getLeader().equals(replica) && !isLeader) { |
| rs.add(solrCore); |
| } |
| } |
| } |
| return rs; |
| } |
| |
| private void checkRTG(int from, int to, List<JettySolrRunner> solrRunners) throws Exception{ |
| for (JettySolrRunner solrRunner: solrRunners) { |
| try (SolrClient client = solrRunner.newClient()) { |
| for (int i = from; i <= to; i++) { |
| SolrQuery query = new SolrQuery(); |
| query.set("distrib", false); |
| query.setRequestHandler("/get"); |
| query.set("id",i); |
| QueryResponse res = client.query(collectionName, query); |
| assertNotNull("Can not find doc "+ i + " in " + solrRunner.getBaseUrl(),res.getResponse().get("doc")); |
| } |
| } |
| } |
| } |
| |
| private List<JettySolrRunner> getSolrRunner(boolean isLeader) { |
| List<JettySolrRunner> rs = new ArrayList<>(); |
| CloudSolrClient cloudClient = cluster.getSolrClient(); |
| DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName); |
| for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) { |
| if (solrRunner.getCoreContainer() == null) continue; |
| for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) { |
| CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor(); |
| Slice slice = docCollection.getSlice(cloudDescriptor.getShardId()); |
| Replica replica = docCollection.getReplica(cloudDescriptor.getCoreNodeName()); |
| if (slice.getLeader() == replica && isLeader) { |
| rs.add(solrRunner); |
| } else if (slice.getLeader() != replica && !isLeader) { |
| rs.add(solrRunner); |
| } |
| } |
| } |
| return rs; |
| } |
| |
| private void waitForReplicasCatchUp(int numTry) throws IOException, InterruptedException { |
| String leaderTimeCommit = getSolrCore(true).get(0).getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY); |
| if (leaderTimeCommit == null) return; |
| for (int i = 0; i < numTry; i++) { |
| boolean inSync = true; |
| for (SolrCore solrCore : getSolrCore(false)) { |
| String replicateTimeCommit = solrCore.getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY); |
| if (!leaderTimeCommit.equals(replicateTimeCommit)) { |
| inSync = false; |
| Thread.sleep(500); |
| break; |
| } |
| } |
| if (inSync) return; |
| } |
| |
| fail("Some replicas are not in sync with leader"); |
| |
| } |
| |
| private boolean assertCopyOverOldUpdates(long delta, Map<SolrCore, Long> timesPerCore) { |
| for (SolrCore core : timesPerCore.keySet()) { |
| if (timesPerCore.get(core) + delta != getTimesCopyOverOldUpdates(core)) return false; |
| } |
| return true; |
| } |
| |
| private Map<SolrCore, Long> getTimesCopyOverOldUpdates(List<SolrCore> cores) { |
| Map<SolrCore, Long> timesPerCore = new HashMap<>(); |
| for (SolrCore core : cores) { |
| long times = getTimesCopyOverOldUpdates(core); |
| timesPerCore.put(core, times); |
| } |
| return timesPerCore; |
| } |
| |
| private long getTimesCopyOverOldUpdates(SolrCore core) { |
| return ((Meter)core.getMetricRegistry().getMetrics().get("TLOG.copyOverOldUpdates.ops")).getCount(); |
| } |
| } |