| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.cloud.overseer; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.nio.file.Path; |
| import java.time.Instant; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.BrokenBarrierException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.atomic.LongAdder; |
| import org.apache.lucene.util.IOUtils; |
| import org.apache.solr.SolrTestCaseJ4; |
| import org.apache.solr.cloud.OverseerTest; |
| import org.apache.solr.cloud.Stats; |
| import org.apache.solr.cloud.ZkController; |
| import org.apache.solr.cloud.ZkTestServer; |
| import org.apache.solr.common.cloud.ClusterState; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.DocCollectionWatcher; |
| import org.apache.solr.common.cloud.DocRouter; |
| import org.apache.solr.common.cloud.PerReplicaStates; |
| import org.apache.solr.common.cloud.PerReplicaStatesOps; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.Slice; |
| import org.apache.solr.common.cloud.SolrZkClient; |
| import org.apache.solr.common.cloud.ZkStateReader; |
| import org.apache.solr.common.util.CommonTestInjection; |
| import org.apache.solr.common.util.ExecutorUtil; |
| import org.apache.solr.common.util.SolrNamedThreadFactory; |
| import org.apache.solr.common.util.TimeSource; |
| import org.apache.solr.common.util.Utils; |
| import org.apache.solr.common.util.ZLibCompressor; |
| import org.apache.solr.handler.admin.ConfigSetsHandler; |
| import org.apache.solr.util.LogLevel; |
| import org.apache.solr.util.TimeOut; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.data.Stat; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| @LogLevel( |
| "org.apache.solr.common.cloud.ZkStateReader=DEBUG;org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG") |
| public class ZkStateReaderTest extends SolrTestCaseJ4 { |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| private static final long TIMEOUT = 30; |
| |
| private static class TestFixture implements Closeable { |
| private final ZkTestServer server; |
| private final SolrZkClient zkClient; |
| private final ZkStateReader reader; |
| private final ZkStateWriter writer; |
| |
| private TestFixture( |
| ZkTestServer server, SolrZkClient zkClient, ZkStateReader reader, ZkStateWriter writer) { |
| this.server = server; |
| this.zkClient = zkClient; |
| this.reader = reader; |
| this.writer = writer; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| IOUtils.close(reader, zkClient); |
| try { |
| server.shutdown(); |
| } catch (InterruptedException e) { |
| // ok. Shutting down anyway |
| } |
| } |
| } |
| |
| private TestFixture fixture = null; |
| |
| @Override |
| @Before |
| public void setUp() throws Exception { |
| super.setUp(); |
| fixture = setupTestFixture(getTestName(), -1); |
| } |
| |
| @Override |
| @After |
| public void tearDown() throws Exception { |
| if (fixture != null) { |
| fixture.close(); |
| } |
| super.tearDown(); |
| } |
| |
| private static TestFixture setupTestFixture(String testPrefix, int minStateByteLenForCompression) |
| throws Exception { |
| Path zkDir = createTempDir(testPrefix); |
| ZkTestServer server = new ZkTestServer(zkDir); |
| server.run(); |
| SolrZkClient zkClient = |
| new SolrZkClient.Builder() |
| .withUrl(server.getZkAddress()) |
| .withTimeout(OverseerTest.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS) |
| .build(); |
| ZkController.createClusterZkNodes(zkClient); |
| |
| ZkStateReader reader = new ZkStateReader(zkClient); |
| reader.createClusterStateWatchersAndUpdate(); |
| |
| ZkStateWriter writer = |
| new ZkStateWriter(reader, new Stats(), minStateByteLenForCompression, new ZLibCompressor()); |
| |
| return new TestFixture(server, zkClient, reader, writer); |
| } |
| |
| public void testExternalCollectionWatchedNotWatched() throws Exception { |
| ZkStateWriter writer = fixture.writer; |
| ZkStateReader reader = fixture.reader; |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); |
| |
| // create new collection |
| ZkWriteCommand c1 = |
| new ZkWriteCommand( |
| "c1", |
| DocCollection.create( |
| "c1", |
| new HashMap<>(), |
| Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), |
| DocRouter.DEFAULT, |
| 0, |
| Instant.now(), |
| PerReplicaStatesOps.getZkClientPrsSupplier( |
| fixture.zkClient, DocCollection.getCollectionPath("c1")))); |
| |
| writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null); |
| writer.writePendingUpdates(); |
| reader.forceUpdateCollection("c1"); |
| |
| assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); |
| reader.registerCore("c1"); |
| assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); |
| reader.unregisterCore("c1"); |
| assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); |
| } |
| |
| public void testCollectionStateWatcherCaching() throws Exception { |
| ZkStateWriter writer = fixture.writer; |
| ZkStateReader reader = fixture.reader; |
| |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); |
| |
| DocCollection state = |
| DocCollection.create( |
| "c1", |
| new HashMap<>(), |
| Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), |
| DocRouter.DEFAULT, |
| 0, |
| Instant.now(), |
| PerReplicaStatesOps.getZkClientPrsSupplier( |
| fixture.zkClient, DocCollection.getCollectionPath("c1"))); |
| ZkWriteCommand wc = new ZkWriteCommand("c1", state); |
| writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); |
| writer.writePendingUpdates(); |
| assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true)); |
| reader.waitForState( |
| "c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null); |
| |
| Map<String, Object> props = new HashMap<>(); |
| props.put("x", "y"); |
| props.put(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME); |
| state = |
| DocCollection.create( |
| "c1", |
| new HashMap<>(), |
| props, |
| DocRouter.DEFAULT, |
| 0, |
| Instant.now(), |
| PerReplicaStatesOps.getZkClientPrsSupplier( |
| fixture.zkClient, DocCollection.getCollectionPath("c1"))); |
| wc = new ZkWriteCommand("c1", state); |
| writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); |
| writer.writePendingUpdates(); |
| |
| boolean found = false; |
| TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME); |
| while (!timeOut.hasTimedOut()) { |
| DocCollection c1 = reader.getClusterState().getCollection("c1"); |
| if ("y".equals(c1.getStr("x"))) { |
| found = true; |
| break; |
| } |
| } |
| assertTrue("Could not find updated property in collection c1 even after 5 seconds", found); |
| } |
| |
| public void testWatchedCollectionCreation() throws Exception { |
| ZkStateWriter writer = fixture.writer; |
| ZkStateReader reader = fixture.reader; |
| |
| reader.registerCore("c1"); |
| |
| // Initially there should be no c1 collection. |
| assertNull(reader.getClusterState().getCollectionRef("c1")); |
| |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); |
| reader.forceUpdateCollection("c1"); |
| |
| // Still no c1 collection, despite a collection path. |
| assertNull(reader.getClusterState().getCollectionRef("c1")); |
| |
| // create new collection |
| DocCollection state = |
| DocCollection.create( |
| "c1", |
| new HashMap<>(), |
| Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), |
| DocRouter.DEFAULT, |
| 0, |
| Instant.now(), |
| PerReplicaStatesOps.getZkClientPrsSupplier( |
| fixture.zkClient, DocCollection.getCollectionPath("c1"))); |
| ZkWriteCommand wc = new ZkWriteCommand("c1", state); |
| writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); |
| writer.writePendingUpdates(); |
| |
| assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true)); |
| |
| // reader.forceUpdateCollection("c1"); |
| reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null); |
| ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1"); |
| assertNotNull(ref); |
| assertFalse(ref.isLazilyLoaded()); |
| |
| Stat stat = new Stat(); |
| fixture.zkClient.getData(ZkStateReader.getCollectionPath("c1"), null, stat, false); |
| assertEquals(Instant.ofEpochMilli(stat.getCtime()), ref.get().getCreationTime()); |
| } |
| |
| /** |
| * Verifies that znode and child versions are correct and version changes trigger cluster state |
| * updates |
| */ |
| public void testNodeVersion() throws Exception { |
| ZkStateWriter writer = fixture.writer; |
| ZkStateReader reader = fixture.reader; |
| |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); |
| |
| ClusterState clusterState = reader.getClusterState(); |
| // create new collection |
| DocCollection state = |
| DocCollection.create( |
| "c1", |
| new HashMap<>(), |
| Map.of( |
| ZkStateReader.CONFIGNAME_PROP, |
| ConfigSetsHandler.DEFAULT_CONFIGSET_NAME, |
| DocCollection.CollectionStateProps.PER_REPLICA_STATE, |
| "true"), |
| DocRouter.DEFAULT, |
| 0, |
| Instant.now(), |
| PerReplicaStatesOps.getZkClientPrsSupplier( |
| fixture.zkClient, DocCollection.getCollectionPath("c1"))); |
| ZkWriteCommand wc = new ZkWriteCommand("c1", state); |
| writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); |
| clusterState = writer.writePendingUpdates(); |
| |
| // have to register it here after the updates, otherwise the child node watch will not be |
| // inserted |
| reader.registerCore("c1"); |
| |
| TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); |
| timeOut.waitFor( |
| "Timeout on waiting for c1 to show up in cluster state", |
| () -> reader.getClusterState().getCollectionOrNull("c1") != null); |
| |
| ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1"); |
| assertFalse(ref.isLazilyLoaded()); |
| assertEquals(0, ref.get().getZNodeVersion()); |
| // no more dummy node |
| assertEquals(0, ref.get().getChildNodesVersion()); |
| |
| DocCollection collection = ref.get(); |
| PerReplicaStates prs = |
| PerReplicaStatesOps.fetch( |
| collection.getZNode(), fixture.zkClient, collection.getPerReplicaStates()); |
| PerReplicaStatesOps.addReplica("r1", Replica.State.DOWN, false, prs) |
| .persist(collection.getZNode(), fixture.zkClient); |
| timeOut.waitFor( |
| "Timeout on waiting for c1 updated to have PRS state r1", |
| () -> { |
| DocCollection c = reader.getCollection("c1"); |
| return c.getPerReplicaStates() != null |
| && c.getPerReplicaStates().get("r1") != null |
| && c.getPerReplicaStates().get("r1").state == Replica.State.DOWN; |
| }); |
| |
| ref = reader.getClusterState().getCollectionRef("c1"); |
| assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version |
| assertEquals(1, ref.get().getChildNodesVersion()); // but child version should be 1 now |
| |
| prs = ref.get().getPerReplicaStates(); |
| PerReplicaStatesOps.flipState("r1", Replica.State.ACTIVE, prs) |
| .persist(collection.getZNode(), fixture.zkClient); |
| timeOut.waitFor( |
| "Timeout on waiting for c1 updated to have PRS state r1 marked as DOWN", |
| () -> |
| reader.getCollection("c1").getPerReplicaStates().get("r1").state |
| == Replica.State.ACTIVE); |
| |
| ref = reader.getClusterState().getCollectionRef("c1"); |
| assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version |
| // but child version should be 3 now (1 del + 1 add) |
| assertEquals(3, ref.get().getChildNodesVersion()); |
| |
| // now delete the collection |
| wc = new ZkWriteCommand("c1", null); |
| writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); |
| clusterState = writer.writePendingUpdates(); |
| timeOut.waitFor( |
| "Timeout on waiting for c1 to be removed from cluster state", |
| () -> reader.getClusterState().getCollectionOrNull("c1") == null); |
| |
| reader.unregisterCore("c1"); |
| // re-add the same collection |
| wc = new ZkWriteCommand("c1", state); |
| writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); |
| clusterState = writer.writePendingUpdates(); |
| // re-register, otherwise the child watch would be missing from collection deletion |
| reader.registerCore("c1"); |
| |
| // reader.forceUpdateCollection("c1"); |
| timeOut.waitFor( |
| "Timeout on waiting for c1 to show up in cluster state again", |
| () -> reader.getClusterState().getCollectionOrNull("c1") != null); |
| ref = reader.getClusterState().getCollectionRef("c1"); |
| assertFalse(ref.isLazilyLoaded()); |
| assertEquals(0, ref.get().getZNodeVersion()); |
| assertEquals(0, ref.get().getChildNodesVersion()); // child node version is reset |
| |
| // re-add PRS |
| collection = ref.get(); |
| prs = |
| PerReplicaStatesOps.fetch( |
| collection.getZNode(), fixture.zkClient, collection.getPerReplicaStates()); |
| PerReplicaStatesOps.addReplica("r1", Replica.State.DOWN, false, prs) |
| .persist(collection.getZNode(), fixture.zkClient); |
| timeOut.waitFor( |
| "Timeout on waiting for c1 updated to have PRS state r1", |
| () -> { |
| DocCollection c = reader.getCollection("c1"); |
| return c.getPerReplicaStates() != null |
| && c.getPerReplicaStates().get("r1") != null |
| && c.getPerReplicaStates().get("r1").state == Replica.State.DOWN; |
| }); |
| |
| ref = reader.getClusterState().getCollectionRef("c1"); |
| |
| // child version should be reset since the state.json node was deleted and re-created |
| assertEquals(1, ref.get().getChildNodesVersion()); |
| } |
| |
| public void testForciblyRefreshAllClusterState() throws Exception { |
| ZkStateWriter writer = fixture.writer; |
| ZkStateReader reader = fixture.reader; |
| |
| reader.registerCore("c1"); // watching c1, so it should get non lazy reference |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); |
| |
| reader.forciblyRefreshAllClusterStateSlow(); |
| // Initially there should be no c1 collection. |
| assertNull(reader.getClusterState().getCollectionRef("c1")); |
| |
| // create new collection |
| DocCollection state = |
| DocCollection.create( |
| "c1", |
| new HashMap<>(), |
| Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), |
| DocRouter.DEFAULT, |
| 0, |
| Instant.now(), |
| PerReplicaStatesOps.getZkClientPrsSupplier( |
| fixture.zkClient, DocCollection.getCollectionPath("c1"))); |
| ZkWriteCommand wc = new ZkWriteCommand("c1", state); |
| writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); |
| writer.writePendingUpdates(); |
| |
| assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true)); |
| |
| reader.forciblyRefreshAllClusterStateSlow(); |
| ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1"); |
| assertNotNull(ref); |
| assertFalse(ref.isLazilyLoaded()); |
| assertEquals(0, ref.get().getZNodeVersion()); |
| |
| // update the collection |
| state = |
| DocCollection.create( |
| "c1", |
| new HashMap<>(), |
| Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), |
| DocRouter.DEFAULT, |
| ref.get().getZNodeVersion(), |
| Instant.now(), |
| PerReplicaStatesOps.getZkClientPrsSupplier( |
| fixture.zkClient, DocCollection.getCollectionPath("c1"))); |
| wc = new ZkWriteCommand("c1", state); |
| writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); |
| writer.writePendingUpdates(); |
| |
| reader.forciblyRefreshAllClusterStateSlow(); |
| ref = reader.getClusterState().getCollectionRef("c1"); |
| assertNotNull(ref); |
| assertFalse(ref.isLazilyLoaded()); |
| assertEquals(1, ref.get().getZNodeVersion()); |
| |
| // delete the collection c1, add a collection c2 that is NOT watched |
| ZkWriteCommand wc1 = new ZkWriteCommand("c1", null); |
| |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true); |
| state = |
| DocCollection.create( |
| "c2", |
| new HashMap<>(), |
| Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), |
| DocRouter.DEFAULT, |
| 0, |
| Instant.now(), |
| PerReplicaStatesOps.getZkClientPrsSupplier( |
| fixture.zkClient, DocCollection.getCollectionPath("c2"))); |
| ZkWriteCommand wc2 = new ZkWriteCommand("c2", state); |
| |
| writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null); |
| writer.writePendingUpdates(); |
| |
| reader.forciblyRefreshAllClusterStateSlow(); |
| ref = reader.getClusterState().getCollectionRef("c1"); |
| assertNull(ref); |
| |
| ref = reader.getClusterState().getCollectionRef("c2"); |
| assertNotNull(ref); |
| assertTrue( |
| "c2 should have been lazily loaded but is not!", |
| ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched |
| assertEquals(0, ref.get().getZNodeVersion()); |
| } |
| |
| public void testForciblyRefreshAllClusterStateCompressed() throws Exception { |
| fixture.close(); |
| fixture = setupTestFixture(getTestName(), 0); |
| ZkStateWriter writer = fixture.writer; |
| ZkStateReader reader = fixture.reader; |
| |
| reader.registerCore("c1"); // watching c1, so it should get non lazy reference |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); |
| |
| reader.forciblyRefreshAllClusterStateSlow(); |
| // Initially there should be no c1 collection. |
| assertNull(reader.getClusterState().getCollectionRef("c1")); |
| |
| // create new collection |
| DocCollection state = |
| DocCollection.create( |
| "c1", |
| new HashMap<>(), |
| Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), |
| DocRouter.DEFAULT, |
| 0, |
| Instant.now(), |
| null); |
| ZkWriteCommand wc = new ZkWriteCommand("c1", state); |
| writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); |
| writer.writePendingUpdates(); |
| |
| assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true)); |
| |
| reader.forciblyRefreshAllClusterStateSlow(); |
| ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1"); |
| assertNotNull(ref); |
| assertFalse(ref.isLazilyLoaded()); |
| assertEquals(0, ref.get().getZNodeVersion()); |
| |
| // update the collection |
| state = |
| DocCollection.create( |
| "c1", |
| new HashMap<>(), |
| Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), |
| DocRouter.DEFAULT, |
| ref.get().getZNodeVersion(), |
| Instant.now(), |
| null); |
| wc = new ZkWriteCommand("c1", state); |
| writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null); |
| writer.writePendingUpdates(); |
| |
| reader.forciblyRefreshAllClusterStateSlow(); |
| ref = reader.getClusterState().getCollectionRef("c1"); |
| assertNotNull(ref); |
| assertFalse(ref.isLazilyLoaded()); |
| assertEquals(1, ref.get().getZNodeVersion()); |
| |
| // delete the collection c1, add a collection c2 that is NOT watched |
| ZkWriteCommand wc1 = new ZkWriteCommand("c1", null); |
| |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true); |
| state = |
| DocCollection.create( |
| "c2", |
| new HashMap<>(), |
| Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), |
| DocRouter.DEFAULT, |
| 0, |
| Instant.now(), |
| null); |
| ZkWriteCommand wc2 = new ZkWriteCommand("c2", state); |
| |
| writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null); |
| writer.writePendingUpdates(); |
| |
| reader.forciblyRefreshAllClusterStateSlow(); |
| ref = reader.getClusterState().getCollectionRef("c1"); |
| assertNull(ref); |
| |
| ref = reader.getClusterState().getCollectionRef("c2"); |
| assertNotNull(ref); |
| assertTrue( |
| "c2 should have been lazily loaded but is not!", |
| ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched |
| assertEquals(0, ref.get().getZNodeVersion()); |
| } |
| |
| public void testGetCurrentCollections() throws Exception { |
| ZkStateWriter writer = fixture.writer; |
| ZkStateReader reader = fixture.reader; |
| |
| reader.registerCore("c1"); // listen to c1. not yet exist |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); |
| reader.forceUpdateCollection("c1"); |
| Set<String> currentCollections = reader.getCurrentCollections(); |
| assertEquals(0, currentCollections.size()); // no active collections yet |
| |
| // now create both c1 (watched) and c2 (not watched) |
| DocCollection state1 = |
| DocCollection.create( |
| "c1", |
| new HashMap<>(), |
| Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), |
| DocRouter.DEFAULT, |
| 0, |
| Instant.now(), |
| PerReplicaStatesOps.getZkClientPrsSupplier( |
| fixture.zkClient, DocCollection.getCollectionPath("c1"))); |
| ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1); |
| DocCollection state2 = |
| DocCollection.create( |
| "c2", |
| new HashMap<>(), |
| Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), |
| DocRouter.DEFAULT, |
| 0, |
| Instant.now(), |
| PerReplicaStatesOps.getZkClientPrsSupplier( |
| fixture.zkClient, DocCollection.getCollectionPath("c1"))); |
| |
| // do not listen to c2 |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true); |
| ZkWriteCommand wc2 = new ZkWriteCommand("c2", state2); |
| |
| writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null); |
| writer.writePendingUpdates(); |
| |
| reader.forceUpdateCollection("c1"); |
| reader.forceUpdateCollection("c2"); |
| |
| // should detect both collections (c1 watched, c2 lazy loaded) |
| currentCollections = reader.getCurrentCollections(); |
| assertEquals(2, currentCollections.size()); |
| } |
| |
| /** |
| * Simulates race condition that might arise when state updates triggered by watch notification |
| * contend with removal of collection watches. |
| * |
| * <p>Such race condition should no longer exist with the new code that uses a single map for both |
| * "collection watches" and "latest state of watched collection" |
| */ |
| public void testWatchRaceCondition() throws Exception { |
| ExecutorService executorService = |
| ExecutorUtil.newMDCAwareSingleThreadExecutor( |
| new SolrNamedThreadFactory("zkStateReaderTest")); |
| CommonTestInjection.setDelay(1000); |
| final AtomicBoolean stopMutatingThread = new AtomicBoolean(false); |
| try { |
| ZkStateWriter writer = fixture.writer; |
| final ZkStateReader reader = fixture.reader; |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); |
| |
| // start another thread to constantly updating the state |
| final AtomicReference<Exception> updateException = new AtomicReference<>(); |
| executorService.submit( |
| () -> { |
| try { |
| ClusterState clusterState = reader.getClusterState(); |
| while (!stopMutatingThread.get()) { |
| DocCollection collection = clusterState.getCollectionOrNull("c1"); |
| int currentVersion = collection != null ? collection.getZNodeVersion() : 0; |
| // create new collection |
| DocCollection state = |
| DocCollection.create( |
| "c1", |
| new HashMap<>(), |
| Map.of( |
| ZkStateReader.CONFIGNAME_PROP, |
| ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), |
| DocRouter.DEFAULT, |
| currentVersion, |
| Instant.now(), |
| PerReplicaStatesOps.getZkClientPrsSupplier( |
| fixture.zkClient, DocCollection.getCollectionPath("c1"))); |
| ZkWriteCommand wc = new ZkWriteCommand("c1", state); |
| writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); |
| clusterState = writer.writePendingUpdates(); |
| TimeUnit.MILLISECONDS.sleep(100); |
| } |
| } catch (Exception e) { |
| updateException.set(e); |
| } |
| return null; |
| }); |
| executorService.shutdown(); |
| |
| reader.waitForState( |
| "c1", |
| 10, |
| TimeUnit.SECONDS, |
| slices -> slices != null); // wait for the state to become available |
| |
| final CountDownLatch latch = new CountDownLatch(2); |
| |
| // remove itself on 2nd trigger |
| DocCollectionWatcher dummyWatcher = |
| collection -> { |
| latch.countDown(); |
| return latch.getCount() == 0; |
| }; |
| reader.registerDocCollectionWatcher("c1", dummyWatcher); |
| assertTrue( |
| "Missing expected collection updates after the wait", latch.await(10, TimeUnit.SECONDS)); |
| reader.removeDocCollectionWatcher("c1", dummyWatcher); |
| |
| // cluster state might not be updated right the way from the removeDocCollectionWatcher call |
| // above as org.apache.solr.common.cloud.ZkStateReader.Notification might remove the watcher |
| // as well and might still be in the middle of updating the cluster state. |
| TimeOut timeOut = new TimeOut(2000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); |
| timeOut.waitFor( |
| "The ref is not lazily loaded after waiting", |
| () -> reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); |
| |
| if (updateException.get() != null) { |
| throw (updateException.get()); |
| } |
| } finally { |
| stopMutatingThread.set(true); |
| CommonTestInjection.reset(); |
| ExecutorUtil.awaitTermination(executorService); |
| } |
| } |
| |
| /** |
| * Simulates race condition that can arise from the normal way in which the removal of collection |
| * StateWatchers is deferred. |
| * |
| * <p>StateWatchers are registered at the level of Zk code, so when StateWatchers are removed in |
| * Solr code, the actual removal is deferred until the next callback for the associated collection |
| * fires, at which point the removed watcher should allow itself to expire. If a watcher is |
| * re-added for the associated collection in the intervening time, only the most recently added |
| * watcher should re-register; the removed watcher should simply expire. |
| * |
| * <p>Duplicate/redundant StateWatchers should no longer be registered with the new code that |
| * tracks the currently registered singleton-per-collection watcher in Solr code, and only |
| * re-registers the currently active watcher, with all other watchers allowing themselves to |
| * expire. |
| */ |
| public void testStateWatcherRaceCondition() throws Exception { |
| ZkStateWriter writer = fixture.writer; |
| final ZkStateReader reader = fixture.reader; |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true); |
| int extraWatchers = 10; |
| int iterations = 10; |
| for (int i = 0; i < extraWatchers; i++) { |
| // add and remove a bunch of watchers |
| DocCollectionWatcher w = (coll) -> false; |
| try { |
| reader.registerDocCollectionWatcher("c1", w); |
| } finally { |
| reader.removeDocCollectionWatcher("c1", w); |
| } |
| } |
| final ConcurrentHashMap<Integer, LongAdder> invoked = new ConcurrentHashMap<>(); |
| CyclicBarrier barrier = new CyclicBarrier(2); |
| reader.registerDocCollectionWatcher( |
| "c1", |
| (coll) -> { |
| // add a watcher that tracks how many times it's invoked per znode version |
| if (coll != null) { |
| try { |
| barrier.await(250, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException | TimeoutException | BrokenBarrierException e) { |
| throw new RuntimeException(e); |
| } |
| invoked.computeIfAbsent(coll.getZNodeVersion(), (k) -> new LongAdder()).increment(); |
| } |
| return false; |
| }); |
| |
| ClusterState clusterState = reader.getClusterState(); |
| int dataVersion = -1; |
| for (int i = 0; i < iterations; i++) { |
| // create or update collection |
| DocCollection state = |
| DocCollection.create( |
| "c1", |
| new HashMap<>(), |
| Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME), |
| DocRouter.DEFAULT, |
| dataVersion, |
| Instant.now(), |
| PerReplicaStatesOps.getZkClientPrsSupplier( |
| fixture.zkClient, DocCollection.getCollectionPath("c1"))); |
| ZkWriteCommand wc = new ZkWriteCommand("c1", state); |
| writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); |
| clusterState = writer.writePendingUpdates(); |
| barrier.await(250, TimeUnit.MILLISECONDS); // wait for the watch callback to execute |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1" + i, true); |
| dataVersion = clusterState.getCollectionOrNull("c1").getZNodeVersion(); |
| } |
| // expect to have been invoked for each iteration ... |
| assertEquals(iterations, invoked.size()); |
| // ... and only _once_ for each iteration |
| assertTrue( |
| "wrong number of watchers (expected 1): " + invoked, |
| invoked.values().stream().mapToLong(LongAdder::sum).allMatch((l) -> l == 1)); |
| } |
| |
| /** |
| * Ensure that collection state fetching (getCollectionLive etc.) would not throw exception when |
| * the state.json is deleted in between the state.json read and PRS entries read |
| */ |
| public void testDeletePrsCollection() throws Exception { |
| ZkStateWriter writer = fixture.writer; |
| ZkStateReader reader = fixture.reader; |
| |
| String collectionName = "c1"; |
| fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName, true); |
| |
| ClusterState clusterState = reader.getClusterState(); |
| |
| String nodeName = "node1:10000_solr"; |
| String sliceName = "shard1"; |
| Slice slice = new Slice(sliceName, Map.of(), Map.of(), collectionName); |
| |
| // create new collection |
| DocCollection state = |
| DocCollection.create( |
| collectionName, |
| Map.of(sliceName, slice), |
| Collections.singletonMap(DocCollection.CollectionStateProps.PER_REPLICA_STATE, true), |
| DocRouter.DEFAULT, |
| 0, |
| Instant.now(), |
| PerReplicaStatesOps.getZkClientPrsSupplier( |
| fixture.zkClient, DocCollection.getCollectionPath(collectionName))); |
| ZkWriteCommand wc = new ZkWriteCommand(collectionName, state); |
| writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); |
| clusterState = writer.writePendingUpdates(); |
| |
| TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME); |
| timeOut.waitFor( |
| "Timeout on waiting for c1 to show up in cluster state", |
| () -> reader.getClusterState().getCollectionOrNull(collectionName) != null); |
| |
| String collectionPath = ZkStateReader.getCollectionPath(collectionName); |
| |
| // now create the replica, take note that this has to be done after DocCollection creation with |
| // empty slice, otherwise the DocCollection ctor would fetch the PRS entries and throw |
| // exceptions |
| String replicaBaseUrl = Utils.getBaseUrlForNodeName(nodeName, "http"); |
| |
| String replicaName = "replica1"; |
| Replica replica = |
| new Replica( |
| replicaName, |
| Map.of( |
| ZkStateReader.CORE_NAME_PROP, |
| "core1", |
| ZkStateReader.STATE_PROP, |
| Replica.State.ACTIVE.toString(), |
| ZkStateReader.NODE_NAME_PROP, |
| nodeName, |
| ZkStateReader.BASE_URL_PROP, |
| replicaBaseUrl, |
| ZkStateReader.REPLICA_TYPE, |
| Replica.Type.NRT.name()), |
| collectionName, |
| sliceName); |
| |
| wc = |
| new ZkWriteCommand( |
| collectionName, SliceMutator.updateReplica(state, slice, replica.getName(), replica)); |
| writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null); |
| clusterState = writer.writePendingUpdates(); |
| |
| timeOut.waitFor( |
| "Timeout on waiting for replica to show up in cluster state", |
| () -> |
| reader.getCollectionLive(collectionName).getSlice(sliceName).getReplica(replicaName) |
| != null); |
| |
| try (CommonTestInjection.BreakpointSetter breakpointSetter = |
| new CommonTestInjection.BreakpointSetter()) { |
| // set breakpoint such that after state.json fetch and before PRS entry fetch, we can delete |
| // the state.json and PRS entries to trigger the race condition |
| breakpointSetter.setImplementation( |
| PerReplicaStatesOps.class.getName() + "/beforePrsFetch", |
| (args) -> { |
| try { |
| // this is invoked after ZkStateReader.fetchCollectionState has fetched the state.json |
| // but before PRS entries. |
| // call delete state.json on ZK directly, very tricky to control execution order with |
| // writer.enqueueUpdate |
| reader.getZkClient().clean(collectionPath); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } catch (KeeperException e) { |
| throw new RuntimeException(e); |
| } |
| }); |
| |
| // set breakpoint to verify the expected PrsZkNodeNotFoundException is indeed thrown within |
| // the execution flow, such exception is caught within the logic and not thrown to the |
| // caller |
| AtomicBoolean prsZkNodeNotFoundExceptionThrown = new AtomicBoolean(false); |
| breakpointSetter.setImplementation( |
| ZkStateReader.class.getName() + "/exercised", |
| (args) -> { |
| if (args[0] instanceof PerReplicaStatesOps.PrsZkNodeNotFoundException) { |
| prsZkNodeNotFoundExceptionThrown.set(true); |
| } |
| }); |
| |
| timeOut.waitFor( |
| "Timeout waiting for collection state to become null", |
| () -> { |
| // this should not throw exception even if the PRS entry read is delayed artificially |
| // (by previous command) and deleted after the following getCollectionLive call |
| return reader.getCollectionLive(collectionName) == null; |
| }); |
| |
| assertTrue(prsZkNodeNotFoundExceptionThrown.get()); |
| } |
| } |
| } |