blob: a839f3e6854825af5c2933b09ff12bda6d6c0b56 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.overseer;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.OverseerTest;
import org.apache.solr.cloud.Stats;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.PerReplicaStatesFetcher;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.CommonTestInjection;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@LogLevel(
"org.apache.solr.common.cloud.ZkStateReader=DEBUG;org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG")
public class ZkStateReaderTest extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final long TIMEOUT = 30;
private static class TestFixture implements Closeable {
private final ZkTestServer server;
private final SolrZkClient zkClient;
private final ZkStateReader reader;
private final ZkStateWriter writer;
private TestFixture(
ZkTestServer server, SolrZkClient zkClient, ZkStateReader reader, ZkStateWriter writer) {
this.server = server;
this.zkClient = zkClient;
this.reader = reader;
this.writer = writer;
}
@Override
public void close() throws IOException {
IOUtils.close(reader, zkClient);
try {
server.shutdown();
} catch (InterruptedException e) {
// ok. Shutting down anyway
}
}
}
private TestFixture fixture = null;
@Before
public void setUp() throws Exception {
super.setUp();
fixture = setupTestFixture(getTestName());
}
@After
public void tearDown() throws Exception {
if (fixture != null) {
fixture.close();
}
super.tearDown();
}
private static TestFixture setupTestFixture(String testPrefix) throws Exception {
Path zkDir = createTempDir(testPrefix);
ZkTestServer server = new ZkTestServer(zkDir);
server.run();
SolrZkClient zkClient =
new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient);
ZkStateReader reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
return new TestFixture(server, zkClient, reader, writer);
}
public void testExternalCollectionWatchedNotWatched() throws Exception {
ZkStateWriter writer = fixture.writer;
ZkStateReader reader = fixture.reader;
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
// create new collection
ZkWriteCommand c1 =
new ZkWriteCommand(
"c1",
new DocCollection(
"c1",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
0,
new PerReplicaStatesFetcher.LazyPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath("c1"))));
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates();
reader.forceUpdateCollection("c1");
assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
reader.registerCore("c1");
assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
reader.unregisterCore("c1");
assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
}
public void testCollectionStateWatcherCaching() throws Exception {
ZkStateWriter writer = fixture.writer;
ZkStateReader reader = fixture.reader;
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
DocCollection state =
new DocCollection(
"c1",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
0,
new PerReplicaStatesFetcher.LazyPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath("c1")));
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
reader.waitForState(
"c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null);
Map<String, Object> props = new HashMap<>();
props.put("x", "y");
props.put(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME);
state =
new DocCollection(
"c1",
new HashMap<>(),
props,
DocRouter.DEFAULT,
0,
new PerReplicaStatesFetcher.LazyPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath("c1")));
wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
boolean found = false;
TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
DocCollection c1 = reader.getClusterState().getCollection("c1");
if ("y".equals(c1.getStr("x"))) {
found = true;
break;
}
}
assertTrue("Could not find updated property in collection c1 even after 5 seconds", found);
}
public void testWatchedCollectionCreation() throws Exception {
ZkStateWriter writer = fixture.writer;
ZkStateReader reader = fixture.reader;
reader.registerCore("c1");
// Initially there should be no c1 collection.
assertNull(reader.getClusterState().getCollectionRef("c1"));
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
reader.forceUpdateCollection("c1");
// Still no c1 collection, despite a collection path.
assertNull(reader.getClusterState().getCollectionRef("c1"));
// create new collection
DocCollection state =
new DocCollection(
"c1",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
0,
new PerReplicaStatesFetcher.LazyPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath("c1")));
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
// reader.forceUpdateCollection("c1");
reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null);
ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
assertNotNull(ref);
assertFalse(ref.isLazilyLoaded());
}
/**
* Verifies that znode and child versions are correct and version changes trigger cluster state
* updates
*/
public void testNodeVersion() throws Exception {
ZkStateWriter writer = fixture.writer;
ZkStateReader reader = fixture.reader;
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
ClusterState clusterState = reader.getClusterState();
// create new collection
DocCollection state =
new DocCollection(
"c1",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
0,
new PerReplicaStatesFetcher.LazyPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath("c1")));
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();
// have to register it here after the updates, otherwise the child node watch will not be
// inserted
reader.registerCore("c1");
TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
timeOut.waitFor(
"Timeout on waiting for c1 to show up in cluster state",
() -> reader.getClusterState().getCollectionOrNull("c1") != null);
ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
assertFalse(ref.isLazilyLoaded());
assertEquals(0, ref.get().getZNodeVersion());
// dummy node created +1 and deleted +1 so 2
assertEquals(2, ref.get().getChildNodesVersion());
DocCollection collection = ref.get();
PerReplicaStates prs =
PerReplicaStatesFetcher.fetch(
collection.getZNode(), fixture.zkClient, collection.getPerReplicaStates());
PerReplicaStatesOps.addReplica("r1", Replica.State.DOWN, false, prs)
.persist(collection.getZNode(), fixture.zkClient);
timeOut.waitFor(
"Timeout on waiting for c1 updated to have PRS state r1",
() -> {
DocCollection c = reader.getCollection("c1");
return c.getPerReplicaStates() != null
&& c.getPerReplicaStates().get("r1") != null
&& c.getPerReplicaStates().get("r1").state == Replica.State.DOWN;
});
ref = reader.getClusterState().getCollectionRef("c1");
assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version
assertEquals(3, ref.get().getChildNodesVersion()); // but child version should be 1 now
prs = ref.get().getPerReplicaStates();
PerReplicaStatesOps.flipState("r1", Replica.State.ACTIVE, prs)
.persist(collection.getZNode(), fixture.zkClient);
timeOut.waitFor(
"Timeout on waiting for c1 updated to have PRS state r1 marked as DOWN",
() ->
reader.getCollection("c1").getPerReplicaStates().get("r1").state
== Replica.State.ACTIVE);
ref = reader.getClusterState().getCollectionRef("c1");
assertEquals(0, ref.get().getZNodeVersion()); // no change in Znode version
// but child version should be 3 now (1 del + 1 add)
assertEquals(5, ref.get().getChildNodesVersion());
// now delete the collection
wc = new ZkWriteCommand("c1", null);
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();
timeOut.waitFor(
"Timeout on waiting for c1 to be removed from cluster state",
() -> reader.getClusterState().getCollectionOrNull("c1") == null);
reader.unregisterCore("c1");
// re-add the same collection
wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();
// re-register, otherwise the child watch would be missing from collection deletion
reader.registerCore("c1");
// reader.forceUpdateCollection("c1");
timeOut.waitFor(
"Timeout on waiting for c1 to show up in cluster state again",
() -> reader.getClusterState().getCollectionOrNull("c1") != null);
ref = reader.getClusterState().getCollectionRef("c1");
assertFalse(ref.isLazilyLoaded());
assertEquals(0, ref.get().getZNodeVersion());
assertEquals(2, ref.get().getChildNodesVersion()); // child node version is reset
// re-add PRS
collection = ref.get();
prs =
PerReplicaStatesFetcher.fetch(
collection.getZNode(), fixture.zkClient, collection.getPerReplicaStates());
PerReplicaStatesOps.addReplica("r1", Replica.State.DOWN, false, prs)
.persist(collection.getZNode(), fixture.zkClient);
timeOut.waitFor(
"Timeout on waiting for c1 updated to have PRS state r1",
() -> {
DocCollection c = reader.getCollection("c1");
return c.getPerReplicaStates() != null
&& c.getPerReplicaStates().get("r1") != null
&& c.getPerReplicaStates().get("r1").state == Replica.State.DOWN;
});
ref = reader.getClusterState().getCollectionRef("c1");
// child version should be reset since the state.json node was deleted and re-created
assertEquals(3, ref.get().getChildNodesVersion());
}
public void testForciblyRefreshAllClusterState() throws Exception {
ZkStateWriter writer = fixture.writer;
ZkStateReader reader = fixture.reader;
reader.registerCore("c1"); // watching c1, so it should get non lazy reference
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
reader.forciblyRefreshAllClusterStateSlow();
// Initially there should be no c1 collection.
assertNull(reader.getClusterState().getCollectionRef("c1"));
// create new collection
DocCollection state =
new DocCollection(
"c1",
new HashMap<>(),
Map.of(
ZkStateReader.CONFIGNAME_PROP,
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME,
DocCollection.CollectionStateProps.PER_REPLICA_STATE,
"true"),
DocRouter.DEFAULT,
0,
new PerReplicaStatesFetcher.LazyPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath("c1")));
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
reader.forciblyRefreshAllClusterStateSlow();
ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
assertNotNull(ref);
assertFalse(ref.isLazilyLoaded());
assertEquals(0, ref.get().getZNodeVersion());
// update the collection
state =
new DocCollection(
"c1",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
ref.get().getZNodeVersion(),
new PerReplicaStatesFetcher.LazyPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath("c1")));
wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
reader.forciblyRefreshAllClusterStateSlow();
ref = reader.getClusterState().getCollectionRef("c1");
assertNotNull(ref);
assertFalse(ref.isLazilyLoaded());
assertEquals(1, ref.get().getZNodeVersion());
// delete the collection c1, add a collection c2 that is NOT watched
ZkWriteCommand wc1 = new ZkWriteCommand("c1", null);
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
state =
new DocCollection(
"c2",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
0,
new PerReplicaStatesFetcher.LazyPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath("c2")));
ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
writer.writePendingUpdates();
reader.forciblyRefreshAllClusterStateSlow();
ref = reader.getClusterState().getCollectionRef("c1");
assertNull(ref);
ref = reader.getClusterState().getCollectionRef("c2");
assertNotNull(ref);
assertTrue(
"c2 should have been lazily loaded but is not!",
ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not watched
assertEquals(0, ref.get().getZNodeVersion());
}
public void testGetCurrentCollections() throws Exception {
ZkStateWriter writer = fixture.writer;
ZkStateReader reader = fixture.reader;
reader.registerCore("c1"); // listen to c1. not yet exist
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
reader.forceUpdateCollection("c1");
Set<String> currentCollections = reader.getCurrentCollections();
assertEquals(0, currentCollections.size()); // no active collections yet
// now create both c1 (watched) and c2 (not watched)
DocCollection state1 =
new DocCollection(
"c1",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
0,
new PerReplicaStatesFetcher.LazyPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath("c1")));
ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
DocCollection state2 =
new DocCollection(
"c2",
new HashMap<>(),
Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
0,
new PerReplicaStatesFetcher.LazyPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath("c1")));
// do not listen to c2
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
ZkWriteCommand wc2 = new ZkWriteCommand("c2", state2);
writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
writer.writePendingUpdates();
reader.forceUpdateCollection("c1");
reader.forceUpdateCollection("c2");
// should detect both collections (c1 watched, c2 lazy loaded)
currentCollections = reader.getCurrentCollections();
assertEquals(2, currentCollections.size());
}
/**
* Simulates race condition that might arise when state updates triggered by watch notification
* contend with removal of collection watches.
*
* <p>Such race condition should no longer exist with the new code that uses a single map for both
* "collection watches" and "latest state of watched collection"
*/
public void testWatchRaceCondition() throws Exception {
ExecutorService executorService =
ExecutorUtil.newMDCAwareSingleThreadExecutor(
new SolrNamedThreadFactory("zkStateReaderTest"));
CommonTestInjection.setDelay(1000);
final AtomicBoolean stopMutatingThread = new AtomicBoolean(false);
try {
ZkStateWriter writer = fixture.writer;
final ZkStateReader reader = fixture.reader;
fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
// start another thread to constantly updating the state
final AtomicReference<Exception> updateException = new AtomicReference<>();
executorService.submit(
() -> {
try {
ClusterState clusterState = reader.getClusterState();
while (!stopMutatingThread.get()) {
DocCollection collection = clusterState.getCollectionOrNull("c1");
int currentVersion = collection != null ? collection.getZNodeVersion() : 0;
// create new collection
DocCollection state =
new DocCollection(
"c1",
new HashMap<>(),
Map.of(
ZkStateReader.CONFIGNAME_PROP,
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
DocRouter.DEFAULT,
currentVersion,
new PerReplicaStatesFetcher.LazyPrsSupplier(
fixture.zkClient, DocCollection.getCollectionPath("c1")));
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
clusterState = writer.writePendingUpdates();
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
updateException.set(e);
}
return null;
});
executorService.shutdown();
reader.waitForState(
"c1",
10,
TimeUnit.SECONDS,
slices -> slices != null); // wait for the state to become available
final CountDownLatch latch = new CountDownLatch(2);
// remove itself on 2nd trigger
DocCollectionWatcher dummyWatcher =
collection -> {
latch.countDown();
return latch.getCount() == 0;
};
reader.registerDocCollectionWatcher("c1", dummyWatcher);
assertTrue(
"Missing expected collection updates after the wait", latch.await(10, TimeUnit.SECONDS));
reader.removeDocCollectionWatcher("c1", dummyWatcher);
// cluster state might not be updated right the way from the removeDocCollectionWatcher call
// above as org.apache.solr.common.cloud.ZkStateReader.Notification might remove the watcher
// as well and might still be in the middle of updating the cluster state.
TimeOut timeOut = new TimeOut(2000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
timeOut.waitFor(
"The ref is not lazily loaded after waiting",
() -> reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
if (updateException.get() != null) {
throw (updateException.get());
}
} finally {
stopMutatingThread.set(true);
CommonTestInjection.reset();
ExecutorUtil.awaitTermination(executorService);
}
}
}