blob: f4c5bb27327348aa8f2de4248709969731a2faaa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.overseer;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.OverseerTest;
import org.apache.solr.cloud.Stats;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TimeOut;
public class ZkStateReaderTest extends SolrTestCaseJ4 {
private static final long TIMEOUT = 30;
/** Uses explicit refresh to ensure latest changes are visible. */
public void testStateFormatUpdateWithExplicitRefresh() throws Exception {
testStateFormatUpdate(true, true);
}
/** Uses explicit refresh to ensure latest changes are visible. */
public void testStateFormatUpdateWithExplicitRefreshLazy() throws Exception {
testStateFormatUpdate(true, false);
}
/** ZkStateReader should automatically pick up changes based on ZK watches. */
public void testStateFormatUpdateWithTimeDelay() throws Exception {
testStateFormatUpdate(false, true);
}
/** ZkStateReader should automatically pick up changes based on ZK watches. */
public void testStateFormatUpdateWithTimeDelayLazy() throws Exception {
testStateFormatUpdate(false, false);
}
public void testStateFormatUpdate(boolean explicitRefresh, boolean isInteresting) throws Exception {
Path zkDir = createTempDir("testStateFormatUpdate");
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient zkClient = null;
ZkStateReader reader = null;
try {
server.run();
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient);
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
if (isInteresting) {
reader.registerCore("c1");
}
ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
{
// create new collection with stateFormat = 1
DocCollection stateV1 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE);
ZkWriteCommand c1 = new ZkWriteCommand("c1", stateV1);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates();
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
assertNotNull(map.get("c1"));
boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
assertFalse(exists);
if (explicitRefresh) {
reader.forceUpdateCollection("c1");
} else {
reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null);
}
DocCollection collection = reader.getClusterState().getCollection("c1");
assertEquals(1, collection.getStateFormat());
}
{
// Now update the collection to stateFormat = 2
DocCollection stateV2 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json");
ZkWriteCommand c2 = new ZkWriteCommand("c1", stateV2);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null);
writer.writePendingUpdates();
Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
assertNull(map.get("c1"));
boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
assertTrue(exists);
if (explicitRefresh) {
reader.forceUpdateCollection("c1");
} else {
reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS,
(n, c) -> c != null && c.getStateFormat() == 2);
}
DocCollection collection = reader.getClusterState().getCollection("c1");
assertEquals(2, collection.getStateFormat());
}
} finally {
IOUtils.close(reader, zkClient);
server.shutdown();
}
}
public void testExternalCollectionWatchedNotWatched() throws Exception{
Path zkDir = createTempDir("testExternalCollectionWatchedNotWatched");
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient zkClient = null;
ZkStateReader reader = null;
try {
server.run();
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient);
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
// create new collection with stateFormat = 2
ZkWriteCommand c1 = new ZkWriteCommand("c1",
new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
writer.writePendingUpdates();
reader.forceUpdateCollection("c1");
assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
reader.registerCore("c1");
assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
reader.unregisterCore("c1");
assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
} finally {
IOUtils.close(reader, zkClient);
server.shutdown();
}
}
public void testCollectionStateWatcherCaching() throws Exception {
Path zkDir = createTempDir("testCollectionStateWatcherCaching");
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient zkClient = null;
ZkStateReader reader = null;
try {
server.run();
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient);
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
reader.waitForState("c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null);
state = new DocCollection("c1", new HashMap<>(), Collections.singletonMap("x", "y"), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
boolean found = false;
TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
DocCollection c1 = reader.getClusterState().getCollection("c1");
if ("y".equals(c1.getStr("x"))) {
found = true;
break;
}
}
assertTrue("Could not find updated property in collection c1 even after 5 seconds", found);
} finally {
IOUtils.close(reader, zkClient);
server.shutdown();
}
}
public void testWatchedCollectionCreation() throws Exception {
Path zkDir = createTempDir("testWatchedCollectionCreation");
ZkTestServer server = new ZkTestServer(zkDir);
SolrZkClient zkClient = null;
ZkStateReader reader = null;
try {
server.run();
zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
ZkController.createClusterZkNodes(zkClient);
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
reader.registerCore("c1");
// Initially there should be no c1 collection.
assertNull(reader.getClusterState().getCollectionRef("c1"));
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
reader.forceUpdateCollection("c1");
// Still no c1 collection, despite a collection path.
assertNull(reader.getClusterState().getCollectionRef("c1"));
ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
// create new collection with stateFormat = 2
DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
ZkWriteCommand wc = new ZkWriteCommand("c1", state);
writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
writer.writePendingUpdates();
assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
//reader.forceUpdateCollection("c1");
reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null);
ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
assertNotNull(ref);
assertFalse(ref.isLazilyLoaded());
assertEquals(2, ref.get().getStateFormat());
} finally {
IOUtils.close(reader, zkClient);
server.shutdown();
}
}
}