blob: 4a05bb013aed7a57118d9ca002667b90379bc834 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.CloudTestUtils;
import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.LiveNodesListener;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_ACTIVE;
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_INACTIVE;
import static org.apache.solr.cloud.autoscaling.OverseerTriggerThread.MARKER_STATE;
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static volatile CountDownLatch triggerFiredLatch;
private static volatile CountDownLatch listenerEventLatch;
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
private volatile ZkStateReader zkStateReader;
private static final ReentrantLock lock = new ReentrantLock();
@Before
public void setupCluster() throws Exception {
System.setProperty("metricsEnabled", "true");
configureCluster(2)
.addConfig("conf", configset("cloud-minimal"))
.configure();
zkStateReader = cluster.getSolrClient().getZkStateReader();
// disable .scheduled_maintenance (once it exists)
CloudTestUtils.waitForTriggerToBeScheduled(cluster.getOpenOverseer().getSolrCloudManager(), ".scheduled_maintenance");
CloudTestUtils.suspendTrigger(cluster.getOpenOverseer().getSolrCloudManager(), ".scheduled_maintenance");
}
@After
public void teardownCluster() throws Exception {
shutdownCluster();
}
private static CountDownLatch getTriggerFiredLatch() {
return triggerFiredLatch;
}
//@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-13376")
@Test
public void testNodeMarkersRegistration() throws Exception {
triggerFiredLatch = new CountDownLatch(1);
listenerEventLatch = new CountDownLatch(1);
TestLiveNodesListener listener = registerLiveNodesListener();
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
String overseerLeader = (String) overSeerStatus.get("leader");
int overseerLeaderIndex = 0;
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
if (jetty.getNodeName().equals(overseerLeader)) {
overseerLeaderIndex = i;
break;
}
}
// add a nodes
JettySolrRunner node = cluster.startJettySolrRunner();
cluster.waitForAllNodes(30);
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
fail("onChange listener didn't execute on cluster change");
}
assertEquals(1, listener.addedNodes.size());
assertTrue(listener.addedNodes.toString(), listener.addedNodes.contains(node.getNodeName()));
// verify that a znode doesn't exist (no trigger)
String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers", zkClient().exists(pathAdded, true));
listener.reset();
// stop overseer
log.info("====== KILL OVERSEER 1");
JettySolrRunner j = cluster.stopJettySolrRunner(overseerLeaderIndex);
cluster.waitForJettyToStop(j);
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
fail("onChange listener didn't execute on cluster change");
}
assertEquals(0, listener.addedNodes.size());
// wait until the new overseer is up
Thread.sleep(5000);
String newOverseerLeader;
do {
overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
newOverseerLeader = (String) overSeerStatus.get("leader");
} while (newOverseerLeader == null || newOverseerLeader.equals(overseerLeader));
assertEquals(1, listener.lostNodes.size());
assertEquals(overseerLeader, listener.lostNodes.iterator().next());
String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
AtomicBoolean markerInactive = new AtomicBoolean();
try {
timeout.waitFor("nodeLost marker to get inactive", () -> {
try {
if (!zkClient().exists(pathLost, true)) {
throw new RuntimeException("marker " + pathLost + " should exist!");
}
Map<String, Object> markerData = Utils.getJson(zkClient(), pathLost, true);
markerInactive.set(markerData.getOrDefault(MARKER_STATE, MARKER_ACTIVE).equals(MARKER_INACTIVE));
return markerInactive.get();
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
return false;
}
});
} catch (TimeoutException e) {
// okay
}
// verify that the marker is inactive - the new overseer should deactivate markers once they are processed
assertTrue("Marker " + pathLost + " still active!", markerInactive.get());
listener.reset();
// set up triggers
CloudSolrClient solrClient = cluster.getSolrClient();
log.info("====== ADD TRIGGERS");
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_triggerMR'," +
"'event' : 'nodeAdded'," +
"'waitFor' : '1s'," +
"'enabled' : true," +
"'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
"}}";
@SuppressWarnings({"rawtypes"})
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_lost_triggerMR'," +
"'event' : 'nodeLost'," +
"'waitFor' : '1s'," +
"'enabled' : true," +
"'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
"}}";
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
String setListener = "{\n" +
" \"set-listener\" : {\n" +
" \"name\" : \"listener_node_added_triggerMR\",\n" +
" \"trigger\" : \"node_added_triggerMR\",\n" +
" \"stage\" : \"STARTED\",\n" +
" \"class\" : \"" + AssertingListener.class.getName() + "\"\n" +
" }\n" +
"}";
req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setListener);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
overseerLeader = (String) overSeerStatus.get("leader");
overseerLeaderIndex = 0;
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
if (jetty.getNodeName().equals(overseerLeader)) {
overseerLeaderIndex = i;
break;
}
}
// create another node
log.info("====== ADD NODE 1");
JettySolrRunner node1 = cluster.startJettySolrRunner();
cluster.waitForAllNodes(30);
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
fail("onChange listener didn't execute on cluster change");
}
assertEquals(1, listener.addedNodes.size());
assertEquals(node1.getNodeName(), listener.addedNodes.iterator().next());
// verify that a znode exists
pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1.getNodeName();
assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
listenerEventLatch.countDown(); // let the trigger thread continue
assertTrue(triggerFiredLatch.await(10, TimeUnit.SECONDS));
// kill this node
listener.reset();
events.clear();
triggerFiredLatch = new CountDownLatch(1);
String node1Name = node1.getNodeName();
cluster.stopJettySolrRunner(node1);
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
fail("onChange listener didn't execute on cluster change");
}
assertEquals(1, listener.lostNodes.size());
assertEquals(node1Name, listener.lostNodes.iterator().next());
// verify that a znode exists
String pathLost2 = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + node1Name;
assertTrue("Path " + pathLost2 + " wasn't created", zkClient().exists(pathLost2, true));
listenerEventLatch.countDown(); // let the trigger thread continue
assertTrue(triggerFiredLatch.await(10, TimeUnit.SECONDS));
// triggers don't remove markers
assertTrue("Path " + pathLost2 + " should still exist", zkClient().exists(pathLost2, true));
listener.reset();
events.clear();
triggerFiredLatch = new CountDownLatch(1);
// kill overseer again
log.info("====== KILL OVERSEER 2");
cluster.stopJettySolrRunner(overseerLeaderIndex);
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
fail("onChange listener didn't execute on cluster change");
}
if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
fail("Trigger should have fired by now");
}
assertEquals(1, events.size());
TriggerEvent ev = events.iterator().next();
@SuppressWarnings({"unchecked"})
List<String> nodeNames = (List<String>) ev.getProperty(TriggerEvent.NODE_NAMES);
assertTrue(nodeNames.contains(overseerLeader));
assertEquals(TriggerEventType.NODELOST, ev.getEventType());
}
private TestLiveNodesListener registerLiveNodesListener() {
TestLiveNodesListener listener = new TestLiveNodesListener();
zkStateReader.registerLiveNodesListener(listener);
return listener;
}
private static class TestLiveNodesListener implements LiveNodesListener {
Set<String> lostNodes = ConcurrentHashMap.newKeySet();
Set<String> addedNodes = ConcurrentHashMap.newKeySet();
CountDownLatch onChangeLatch = new CountDownLatch(1);
public void reset() {
lostNodes.clear();
addedNodes.clear();
onChangeLatch = new CountDownLatch(1);
}
@Override
public boolean onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
onChangeLatch.countDown();
Set<String> old = new HashSet<>(oldLiveNodes);
old.removeAll(newLiveNodes);
if (!old.isEmpty()) {
lostNodes.addAll(old);
}
newLiveNodes.removeAll(oldLiveNodes);
if (!newLiveNodes.isEmpty()) {
addedNodes.addAll(newLiveNodes);
}
return false;
}
}
public static class TestEventMarkerAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext actionContext) {
boolean locked = lock.tryLock();
if (!locked) {
log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
return;
}
try {
events.add(event);
getTriggerFiredLatch().countDown();
} catch (Throwable t) {
log.debug("--throwable", t);
throw t;
} finally {
lock.unlock();
}
}
@Override
public void init() throws Exception {
log.info("TestEventMarkerAction init");
super.init();
}
}
public static class AssertingListener extends TriggerListenerBase {
@Override
public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception {
if (!Thread.currentThread().getName().startsWith("ScheduledTrigger")) {
// for future safety
throw new IllegalThreadStateException("AssertingListener should have been invoked by a thread from the scheduled trigger thread pool");
}
log.debug(" --- listener fired for event: {}, stage: {}", event, stage);
listenerEventLatch.await();
log.debug(" --- listener wait complete for event: {}, stage: {}", event, stage);
}
}
}