blob: 68935f27192f90805e424c041dd126f5ca283faa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.cloud.CloudTestUtils;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.LogLevel;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
public class TriggerSetPropertiesIntegrationTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void setupCluster() throws Exception {
System.setProperty("metricsEnabled", "true");
configureCluster(2)
.addConfig("conf", configset("cloud-minimal"))
.configure();
// disable .scheduled_maintenance (once it exists)
CloudTestUtils.waitForTriggerToBeScheduled(cluster.getOpenOverseer().getSolrCloudManager(), ".scheduled_maintenance");
CloudTestUtils.suspendTrigger(cluster.getOpenOverseer().getSolrCloudManager(), ".scheduled_maintenance");
}
/**
* Test that we can add/remove triggers to a scheduler, and change the config on the fly, and still get
* expected behavior
*/
// commented out on: 17-Feb-2019 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
public void testSetProperties() throws Exception {
final JettySolrRunner runner = cluster.getJettySolrRunner(0);
final SolrResourceLoader resourceLoader = runner.getCoreContainer().getResourceLoader();
final SolrCloudManager solrCloudManager = runner.getCoreContainer().getZkController().getSolrCloudManager();
try (ScheduledTriggers scheduledTriggers = new ScheduledTriggers(resourceLoader, solrCloudManager)) {
AutoScalingConfig config = new AutoScalingConfig(Collections.emptyMap());
scheduledTriggers.setAutoScalingConfig(config);
// Setup a trigger that records the timestamp of each time it was run
// we only need 2 timestamps for the test, so limit the queue and make the trigger a No-Op if full
final BlockingQueue<Long> timestamps = new ArrayBlockingQueue<Long>(2);
final AutoScaling.Trigger t1 = new MockTrigger(TriggerEventType.NODELOST, "mock-timestamper") {
@Override
public void run() {
if (log.isInfoEnabled()) {
log.info("Running {} in {}", this.getName(), Thread.currentThread().getName());
}
timestamps.offer(solrCloudManager.getTimeSource().getTimeNs());
}
};
if (log.isInfoEnabled()) {
log.info("Configuring simple scheduler and adding trigger: {}", t1.getName());
}
t1.configure(resourceLoader, solrCloudManager, Collections.emptyMap());
scheduledTriggers.add(t1);
waitForAndDiffTimestamps("conf(default delay)",
ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS,
timestamps);
if (log.isInfoEnabled()) {
log.info("Reconfiguing scheduler to use 4s delay and clearing queue for trigger: {}", t1.getName());
}
config = config.withProperties(Collections.singletonMap
(AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS, 4));
scheduledTriggers.setAutoScalingConfig(config);
timestamps.clear();
waitForAndDiffTimestamps("conf(four sec delay)",
4, TimeUnit.SECONDS,
timestamps);
if (log.isInfoEnabled()) {
log.info("Removing trigger: {}", t1.getName());
}
scheduledTriggers.remove(t1.getName());
log.info("Reconfiguing scheduler to use default props");
config = config.withProperties(ScheduledTriggers.DEFAULT_PROPERTIES);
scheduledTriggers.setAutoScalingConfig(config);
assertTrue("Test sanity check, need default thread pool to be at least 3 so we can" +
"test lowering it by 2", ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE >= 3);
final int numTriggers = ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE;
final int reducedThreadPoolSize = numTriggers - 2;
// Setup X instances of a trigger that:
// - records it's name as being run
// - skipping all remaining execution if it's name has already been recorded
// - records the name of the thread that ran it
// - blocks on a cyclic barrier untill at Y instances have run (to hog a thread)
// ...to test that the scheduler will add new threads as needed, up to the configured limit
//
// NOTE: the reason we need X unique instances is because the scheduler won't "re-run" a single
// trigger while a previouss "run" is still in process
final List<AutoScaling.Trigger> triggerList = new ArrayList<>(numTriggers);
// Use a cyclic barrier gated by an atomic ref so we can swap it out later
final AtomicReference<CyclicBarrier> latch = new AtomicReference<>(new CyclicBarrier(numTriggers));
// variables for tracking state as we go
// NOTE: all read/write must be gated by synchronizing on the barrier (ref),
// so we we can ensure we are reading a consistent view
final Set<String> threadNames = Collections.synchronizedSet(new LinkedHashSet<>());
final Set<String> triggerNames = Collections.synchronizedSet(new LinkedHashSet<>());
final AtomicLong fails = new AtomicLong(0);
// Use a semaphore to track when each trigger *finishes* so our test thread
// can know when to check & clear the tracking state
final Semaphore completionSemaphore = new Semaphore(numTriggers);
for (int i = 0; i < numTriggers; i++) {
AutoScaling.Trigger trigger = new MockTrigger(TriggerEventType.NODELOST,
"mock-blocking-trigger-" + i) {
@Override
public void run() {
if (log.isInfoEnabled()) {
log.info("Running {} in {}", this.getName(), Thread.currentThread().getName());
}
CyclicBarrier barrier = null;
synchronized (latch) {
if (triggerNames.add(this.getName())) {
if (log.isInfoEnabled()) {
log.info("{}: No-Op since we've already recorded a run", this.getName());
}
return;
}
threadNames.add(Thread.currentThread().getName());
barrier = latch.get();
}
try {
if (log.isInfoEnabled()) {
log.info("{}: waiting on barrier to hog a thread", this.getName());
}
barrier.await(30, TimeUnit.SECONDS);
completionSemaphore.release();
} catch (Exception e) {
fails.incrementAndGet();
log.error("{} : failure waiting on cyclic barrier: {}", this.getName(), e, e);
}
}
};
trigger.configure(resourceLoader, solrCloudManager, Collections.emptyMap());
triggerList.add(trigger);
completionSemaphore.acquire();
if (log.isInfoEnabled()) {
log.info("Adding trigger {} to scheduler", trigger.getName());
}
scheduledTriggers.add(trigger);
}
log.info("Waiting on semaphore for all triggers to signal completion...");
assertTrue("Timed out waiting for semaphore count to be released",
completionSemaphore.tryAcquire(numTriggers, 60, TimeUnit.SECONDS));
synchronized (latch) {
assertEquals("Unexpected number of trigger names found: " + triggerNames.toString(),
numTriggers, triggerNames.size());
assertEquals("Unexpected number of thread ames found: " + threadNames.toString(),
numTriggers, threadNames.size());
assertEquals("Unexpected number of trigger fails recorded, check logs?",
0, fails.get());
// before releasing the latch, clear the state and update our config to use a lower number of threads
log.info("Updating scheduler config to use {} threads", reducedThreadPoolSize);
config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_CORE_POOL_SIZE,
reducedThreadPoolSize));
scheduledTriggers.setAutoScalingConfig(config);
log.info("Updating cyclic barrier and clearing test state so triggers will 'run' again");
latch.set(new CyclicBarrier(reducedThreadPoolSize));
threadNames.clear();
triggerNames.clear();
}
log.info("Waiting on semaphore for all triggers to signal completion...");
assertTrue("Timed out waiting for semaphore count to be released",
completionSemaphore.tryAcquire(numTriggers, 60, TimeUnit.SECONDS));
synchronized (latch) {
assertEquals("Unexpected number of trigger names found: " + triggerNames.toString(),
numTriggers, triggerNames.size());
assertEquals("Unexpected number of thread names found: " + threadNames.toString(),
reducedThreadPoolSize, threadNames.size());
assertEquals("Unexpected number of trigger fails recorded, check logs?",
0, fails.get());
}
}
}
private static final void waitForAndDiffTimestamps(final String label,
final long minExpectedDelta,
final TimeUnit minExpectedDeltaUnit,
final BlockingQueue<Long> timestamps) {
try {
log.info("{}: Waiting for 2 timestamps to be recorded", label);
Long firstTs = timestamps.poll(minExpectedDelta * 3, minExpectedDeltaUnit);
assertNotNull(label + ": Couldn't get first timestampe after max allowed polling", firstTs);
Long secondTs = timestamps.poll(minExpectedDelta * 3, minExpectedDeltaUnit);
assertNotNull(label + ": Couldn't get second timestampe after max allowed polling", secondTs);
final long deltaInNanos = secondTs - firstTs;
final long minExpectedDeltaInNanos = minExpectedDeltaUnit.toNanos(minExpectedDelta);
assertTrue(label + ": Delta between timestamps ("+secondTs+"ns - "+firstTs+"ns = "+deltaInNanos+"ns) is not " +
"at least as much as min expected delay: " + minExpectedDeltaInNanos + "ns",
deltaInNanos >= minExpectedDeltaInNanos);
} catch (InterruptedException e) {
log.error("{}: interupted", label, e);
fail(label + ": interupted:" + e.toString());
}
}
private static abstract class MockTrigger extends TriggerBase {
public MockTrigger(TriggerEventType eventType, String name) {
super(eventType, name);
}
@Override
protected Map<String, Object> getState() {
return Collections.emptyMap();
}
@Override
protected void setState(Map<String, Object> state) { /* No-Op */ }
@Override
public void restoreState(AutoScaling.Trigger old) { /* No-Op */ }
}
}