blob: 7a682edb2925dc532857bd37b27fe6dbdee5e593 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.concurrent;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.FBUtilities;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.cassandra.concurrent.DebuggableThreadPoolExecutorTest.checkLocalStateIsPropagated;
import static org.assertj.core.api.Assertions.assertThat;
public class SEPExecutorTest
{
@BeforeClass
public static void beforeClass()
{
DatabaseDescriptor.daemonInitialization();
}
@Test
public void shutdownTest() throws Throwable
{
for (int i = 0; i < 1000; i++)
{
shutdownOnce(i);
}
}
private static void shutdownOnce(int run) throws Throwable
{
SharedExecutorPool sharedPool = new SharedExecutorPool("SharedPool");
String MAGIC = "UNREPEATABLE_MAGIC_STRING";
OutputStream nullOutputStream = new OutputStream() {
public void write(int b) { }
};
try (PrintStream nullPrintSteam = new PrintStream(nullOutputStream))
{
for (int idx = 0; idx < 20; idx++)
{
ExecutorService es = sharedPool.newExecutor(FBUtilities.getAvailableProcessors(), "STAGE", run + MAGIC + idx);
// Write to black hole
es.execute(() -> nullPrintSteam.println("TEST" + es));
}
}
// shutdown does not guarantee that threads are actually dead once it exits, only that they will stop promptly afterwards
sharedPool.shutdownAndWait(1L, TimeUnit.MINUTES);
for (Thread thread : Thread.getAllStackTraces().keySet())
{
if (thread.getName().contains(MAGIC))
{
thread.join(1000);
if (thread.isAlive())
Assert.fail(thread + " is still running " + Arrays.toString(thread.getStackTrace()));
}
}
}
private static class BusyExecutor
{
// Number of busy worker threads to run and gum things up. Chosen to be
// between the low and high max pool size so the test exercises resizing
// under a number of different conditions.
static final int numBusyWorkers = 2;
final AtomicInteger notifiedMaxPoolSize = new AtomicInteger();
SharedExecutorPool sharedPool;
LocalAwareExecutorPlus executor;
Thread makeBusy;
AtomicBoolean stayBusy;
public BusyExecutor(String poolName, String executorName)
{
sharedPool = new SharedExecutorPool(poolName);
executor = sharedPool.newExecutor(0, notifiedMaxPoolSize::set, "internal", executorName);
}
public void start()
{
// Keep feeding the executor work while resizing
// so it stays under load.
stayBusy = new AtomicBoolean(true);
Semaphore busyWorkerPermits = new Semaphore(numBusyWorkers);
makeBusy = new Thread(() -> {
while (stayBusy.get())
{
try
{
if (busyWorkerPermits.tryAcquire(1, MILLISECONDS)) {
executor.execute(new BusyWork(busyWorkerPermits));
}
}
catch (InterruptedException e)
{
// ignore, will either stop looping if done or retry the lock
}
}
});
makeBusy.start();
}
public void shutdown() throws TimeoutException, InterruptedException
{
stayBusy.set(false);
makeBusy.join(TimeUnit.SECONDS.toMillis(5));
Assert.assertFalse("makeBusy thread should have checked stayBusy and exited",
makeBusy.isAlive());
sharedPool.shutdownAndWait(1L, MINUTES);
}
public LocalAwareExecutorPlus getExecutor()
{
return executor;
}
public int getNotifiedMaxPoolSize()
{
return notifiedMaxPoolSize.get();
}
}
@Test
public void changingMaxWorkersMeetsConcurrencyGoalsTest() throws InterruptedException, TimeoutException
{
BusyExecutor busyExecutor = new BusyExecutor("ChangingMaxWorkersMeetsConcurrencyGoalsTest", "resizetest");
LocalAwareExecutorPlus executor = busyExecutor.getExecutor();
busyExecutor.start();
try
{
for (int repeat = 0; repeat < 1000; repeat++)
{
assertMaxTaskConcurrency(executor, 1);
Assert.assertEquals(1, busyExecutor.getNotifiedMaxPoolSize());
assertMaxTaskConcurrency(executor, 2);
Assert.assertEquals(2, busyExecutor.getNotifiedMaxPoolSize());
assertMaxTaskConcurrency(executor, 1);
Assert.assertEquals(1, busyExecutor.getNotifiedMaxPoolSize());
assertMaxTaskConcurrency(executor, 3);
Assert.assertEquals(3, busyExecutor.getNotifiedMaxPoolSize());
executor.setMaximumPoolSize(0);
Assert.assertEquals(0, busyExecutor.getNotifiedMaxPoolSize());
assertMaxTaskConcurrency(executor, 4);
Assert.assertEquals(4, busyExecutor.getNotifiedMaxPoolSize());
}
}
finally
{
busyExecutor.shutdown();
}
}
@Test
public void stoppedWorkersProcessTasksWhenConcurrencyIncreases() throws InterruptedException
{
BusyExecutor busyExecutor = new BusyExecutor("StoppedWorkersProcessTasksWhenConcurrencyIncreases", "stoptest");
LocalAwareExecutorPlus executor = busyExecutor.getExecutor();
busyExecutor.start();
try
{
for (int repeat = 0; repeat < 25; repeat++)
{
assertMaxTaskConcurrency(executor, 3);
Assert.assertEquals(3, busyExecutor.getNotifiedMaxPoolSize());
executor.setMaximumPoolSize(0);
Assert.assertEquals(0, busyExecutor.getNotifiedMaxPoolSize());
Thread.sleep(250);
assertMaxTaskConcurrency(executor, 4);
Assert.assertEquals(4, busyExecutor.getNotifiedMaxPoolSize());
}
}
finally
{
executor.shutdown();
}
}
static class LatchWaiter implements Runnable
{
CountDownLatch latch;
long timeout;
TimeUnit unit;
public LatchWaiter(CountDownLatch latch, long timeout, TimeUnit unit)
{
this.latch = latch;
this.timeout = timeout;
this.unit = unit;
}
public void run()
{
latch.countDown();
try
{
latch.await(timeout, unit); // block until all the latch waiters have run, now at desired concurrency
}
catch (InterruptedException e)
{
Assert.fail("interrupted: " + e);
}
}
}
static class BusyWork implements Runnable
{
private final Semaphore busyWorkers;
public BusyWork(Semaphore busyWorkers)
{
this.busyWorkers = busyWorkers;
}
public void run()
{
busyWorkers.release();
}
}
void assertMaxTaskConcurrency(LocalAwareExecutorPlus executor, int concurrency) throws InterruptedException
{
executor.setMaximumPoolSize(concurrency);
CountDownLatch concurrencyGoal = new CountDownLatch(concurrency);
for (int i = 0; i < concurrency; i++)
{
executor.execute(new LatchWaiter(concurrencyGoal, 5L, TimeUnit.SECONDS));
}
// Will return true if all of the LatchWaiters count down before the timeout
Assert.assertTrue("Test tasks did not hit max concurrency goal", concurrencyGoal.await(3L, TimeUnit.SECONDS));
}
@Test
public void testLocalStatePropagation() throws InterruptedException, TimeoutException
{
SharedExecutorPool sharedPool = new SharedExecutorPool("TestPool");
try
{
LocalAwareExecutorPlus executor = sharedPool.newExecutor(1, "TEST", "TEST");
assertThat(executor).isInstanceOf(LocalAwareExecutorPlus.class);
checkLocalStateIsPropagated(executor);
}
finally
{
sharedPool.shutdownAndWait(1, TimeUnit.SECONDS);
}
}
}