blob: f1d38aab8e50ef6f6516aa0e465a7e5ded0019c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling.sim;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cloud.autoscaling.ActionContext;
import org.apache.solr.cloud.autoscaling.AutoScaling;
import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
import org.apache.solr.cloud.autoscaling.TriggerAction;
import org.apache.solr.cloud.autoscaling.TriggerEvent;
import org.apache.solr.cloud.autoscaling.TriggerValidationException;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.SolrResourceLoader;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test for {@link NodeLostTrigger}
*/
public class TestSimNodeLostTrigger extends SimSolrCloudTestCase {
private static AtomicBoolean actionConstructorCalled = new AtomicBoolean(false);
private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
fail("Did not expect the listener to fire on first run!");
return true;
};
private static final int SPEED = 50;
// use the same time source as the trigger
private static TimeSource timeSource;
// currentTimeMillis is not as precise so to avoid false positives while comparing time of fire, we add some delta
private static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(5);
@Before
public void beforeTest() throws Exception {
configureCluster(5, TimeSource.get("simTime:" + SPEED));
timeSource = cluster.getTimeSource();
actionConstructorCalled = new AtomicBoolean(false);
actionInitCalled = new AtomicBoolean(false);
actionCloseCalled = new AtomicBoolean(false);
}
@After
public void afterTest() throws Exception {
shutdownCluster();
}
@Test
public void testTrigger() throws Exception {
long waitForSeconds = 1 + random().nextInt(5);
Map<String, Object> props = createTriggerProps(waitForSeconds);
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger")) {
trigger.configure(cluster.getLoader(), cluster, props);
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
Iterator<String> it = cluster.getLiveNodesSet().get().iterator();
String lostNodeName1 = it.next();
String lostNodeName2 = it.next();
cluster.simRemoveNode(lostNodeName1, false);
cluster.simRemoveNode(lostNodeName2, false);
timeSource.sleep(1000);
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
fail("NodeLostListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
}
} else {
fail("NodeLostListener was fired more than once!");
}
return true;
});
int counter = 0;
do {
trigger.run();
timeSource.sleep(1000);
if (counter++ > 20) {
fail("Lost node was not discovered by trigger even after 10 seconds");
}
} while (!fired.get());
TriggerEvent nodeLostEvent = eventRef.get();
assertNotNull(nodeLostEvent);
@SuppressWarnings({"unchecked"})
List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
assertTrue(nodeNames + " doesn't contain " + lostNodeName1, nodeNames.contains(lostNodeName1));
assertTrue(nodeNames + " doesn't contain " + lostNodeName2, nodeNames.contains(lostNodeName2));
}
// remove a node but add it back before the waitFor period expires
// and assert that the trigger doesn't fire at all
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger")) {
trigger.configure(cluster.getLoader(), cluster, props);
final long waitTime = 2;
props.put("waitFor", waitTime);
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
String lostNode = cluster.getSimClusterStateProvider().simGetRandomNode();
cluster.simRemoveNode(lostNode, false);
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
long currentTimeNanos = timeSource.getTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
fail("NodeLostListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
}
} else {
fail("NodeLostListener was fired more than once!");
}
return true;
});
trigger.run(); // first run should detect the lost node
int counter = 0;
do {
if (cluster.getLiveNodesSet().get().size() == 2) {
break;
}
timeSource.sleep(100);
if (counter++ > 20) {
fail("Live nodes not updated!");
}
} while (true);
counter = 0;
cluster.getSimClusterStateProvider().simRestoreNode(lostNode);
do {
trigger.run();
timeSource.sleep(1000);
if (counter++ > waitTime + 1) { // run it a little more than the wait time
break;
}
} while (true);
// ensure the event was not fired
assertFalse(fired.get());
}
}
public void testActionLifecycle() throws Exception {
Map<String, Object> props = createTriggerProps(0);
@SuppressWarnings({"unchecked"})
List<Map<String, String>> actions = (List<Map<String, String>>) props.get("actions");
Map<String, String> action = new HashMap<>(2);
action.put("name", "testActionInit");
action.put("class", AssertInitTriggerAction.class.getName());
actions.add(action);
try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger")) {
trigger.configure(cluster.getLoader(), cluster, props);
assertEquals(true, actionConstructorCalled.get());
assertEquals(false, actionInitCalled.get());
assertEquals(false, actionCloseCalled.get());
trigger.init();
assertEquals(true, actionInitCalled.get());
assertEquals(false, actionCloseCalled.get());
}
assertEquals(true, actionCloseCalled.get());
}
public static class AssertInitTriggerAction implements TriggerAction {
public AssertInitTriggerAction() {
actionConstructorCalled.set(true);
}
@Override
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
}
@Override
public void init() {
actionInitCalled.compareAndSet(false, true);
}
@Override
public String getName() {
return "";
}
@Override
public void process(TriggerEvent event, ActionContext actionContext) {
}
@Override
public void close() throws IOException {
actionCloseCalled.compareAndSet(false, true);
}
}
@Test
public void testListenerAcceptance() throws Exception {
Map<String, Object> props = createTriggerProps(0);
try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger")) {
trigger.configure(cluster.getLoader(), cluster, props);
trigger.setProcessor(noFirstRunProcessor);
String newNode = cluster.simAddNode();
trigger.run(); // starts tracking live nodes
// stop the newly created node
cluster.simRemoveNode(newNode, false);
AtomicInteger callCount = new AtomicInteger(0);
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setProcessor(event -> {
if (callCount.incrementAndGet() < 2) {
return false;
} else {
fired.compareAndSet(false, true);
return true;
}
});
trigger.run(); // first run should detect the lost node and fire immediately but listener isn't ready
assertEquals(1, callCount.get());
assertFalse(fired.get());
trigger.run(); // second run should again fire
assertEquals(2, callCount.get());
assertTrue(fired.get());
trigger.run(); // should not fire
assertEquals(2, callCount.get());
}
}
@Test
public void testRestoreState() throws Exception {
long waitForSeconds = 1 + random().nextInt(5);
Map<String, Object> props = createTriggerProps(waitForSeconds);
String newNode = cluster.simAddNode();
// remove a node but update the trigger before the waitFor period expires
// and assert that the new trigger still fires
NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger");
trigger.configure(cluster.getLoader(), cluster, props);
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
// stop the newly created node
cluster.simRemoveNode(newNode, false);
trigger.run(); // this run should detect the lost node
trigger.close(); // close the old trigger
try (NodeLostTrigger newTrigger = new NodeLostTrigger("some_different_name")) {
newTrigger.configure(cluster.getLoader(), cluster, props);
try {
newTrigger.restoreState(trigger);
fail("Trigger should only be able to restore state from an old trigger of the same name");
} catch (AssertionError e) {
// expected
}
}
try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger")) {
newTrigger.configure(cluster.getLoader(), cluster, props);
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
newTrigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
fail("NodeLostListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
}
} else {
fail("NodeLostListener was fired more than once!");
}
return true;
});
newTrigger.restoreState(trigger); // restore state from the old trigger
int counter = 0;
do {
newTrigger.run();
timeSource.sleep(1000);
if (counter++ > 10) {
fail("Lost node was not discovered by trigger even after 10 seconds");
}
} while (!fired.get());
TriggerEvent nodeLostEvent = eventRef.get();
assertNotNull(nodeLostEvent);
@SuppressWarnings({"unchecked"})
List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
assertTrue(nodeNames.contains(newNode));
}
}
private Map<String, Object> createTriggerProps(long waitForSeconds) {
Map<String, Object> props = new HashMap<>();
props.put("event", "nodeLost");
props.put("waitFor", waitForSeconds);
props.put("enabled", true);
List<Map<String, String>> actions = new ArrayList<>(3);
Map<String, String> map = new HashMap<>(2);
map.put("name", "compute_plan");
map.put("class", "solr.ComputePlanAction");
actions.add(map);
map = new HashMap<>(2);
map.put("name", "execute_plan");
map.put("class", "solr.ExecutePlanAction");
actions.add(map);
props.put("actions", actions);
return props;
}
}