blob: 002769f8dfa9bf3992af0097d94954368285d6eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.diff;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import com.google.common.util.concurrent.*;
import org.junit.Test;
import com.codahale.metrics.*;
import static org.apache.cassandra.diff.TestUtils.assertThreadWaits;
import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertEquals;
public class ComparisonExecutorTest {
@Test
public void submitBlocksWhenMaxTasksExceeded() throws Exception {
// submit maxTasks, then assert that further submission blocks until tasks are processed
int maxTasks = 3;
MetricRegistry metrics = metrics();
final ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
Gauge waitingToSubmit = metrics.getGauges().get("BlockedTasks");
assertEquals(0, waitingToSubmit.getValue());
final AtomicInteger successful = new AtomicInteger(0);
final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
final AtomicInteger failed = new AtomicInteger(0);
final Consumer<Throwable> onError = (t) -> failed.incrementAndGet();
final Phaser phaser = new Phaser(1);
BlockingTask[] tasks = new BlockingTask[5];
for (int i=0; i<5; i++)
tasks[i] = new BlockingTask(i);
// Ensure that the submission itself does not block before the max number of tasks are submitted
executor.submit(tasks[0], onSuccess, onError, phaser);
executor.submit(tasks[1], onSuccess, onError, phaser);
executor.submit(tasks[2], onSuccess, onError, phaser);
assertEquals(0, waitingToSubmit.getValue());
// Now submit another pair of tasks which should block as the executor is fully occupied
final CountDownLatch latch = new CountDownLatch(2);
Thread t1 = new Thread(() -> { latch.countDown(); executor.submit(tasks[3], onSuccess, onError, phaser);});
Thread t2 = new Thread(() -> { latch.countDown(); executor.submit(tasks[4], onSuccess, onError, phaser);});
t1.start();
t2.start();
// wait for both to attempt submission
latch.await();
assertThreadWaits(t1);
assertThreadWaits(t2);
assertEquals(2, waitingToSubmit.getValue());
// Let the first waiting task complete, which should allow t1 to complete its submission
tasks[0].latch.countDown();
t1.join();
// the second submission should still be waiting on a slot
assertThreadWaits(t2);
assertEquals(1, waitingToSubmit.getValue());
// Let another task complete, allowing t2 to complete its submission
tasks[1].latch.countDown();
t2.join();
assertEquals(0, waitingToSubmit.getValue());
// Let all tasks complete, wait for them to do so then verify counters
for (int i=2; i<=4; i++)
tasks[i].latch.countDown();
phaser.arriveAndAwaitAdvance();
assertEquals(5, successful.get());
assertEquals(0, failed.get());
}
@Test
public void handleTaskFailure() {
// Ensure that the failure callback is fired, a permit for task submission
// returned and the phaser notified when a task throws
int maxTasks = 5;
MetricRegistry metrics = metrics();
ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
final AtomicInteger successful = new AtomicInteger(0);
final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
AtomicReference<Throwable> thrown = new AtomicReference<>();
final Consumer<Throwable> onError = thrown::set;
final Phaser phaser = new Phaser(1);
assertEquals(maxTasks, availableSlots.getValue());
RuntimeException toThrow = new RuntimeException("FAIL");
BlockingTask task = new BlockingTask(0, toThrow);
executor.submit(task, onSuccess, onError, phaser);
assertEquals(maxTasks - 1, availableSlots.getValue());
assertEquals(2, phaser.getUnarrivedParties());
task.latch.countDown();
phaser.arriveAndAwaitAdvance();
assertEquals(maxTasks, availableSlots.getValue());
assertEquals(0, successful.get());
assertEquals(toThrow, thrown.get());
}
@Test
public void handleUncaughtExceptionInFailureCallback() {
// Ensure that if the failure callback throws, a permit for submission is
// still returned and the phaser notified
int maxTasks = 5;
MetricRegistry metrics = metrics();
ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
final AtomicInteger successful = new AtomicInteger(0);
final AtomicInteger failures = new AtomicInteger(0);
final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
final Consumer<Throwable> onError = (t) -> { failures.incrementAndGet(); throw new RuntimeException("UNCAUGHT"); };
final Phaser phaser = new Phaser(1);
assertEquals(maxTasks, availableSlots.getValue());
RuntimeException toThrow = new RuntimeException("FAIL");
try {
onError.accept(toThrow);
fail("Failure callback should throw RuntimeException");
} catch (RuntimeException e) {
// expected - reset failure count
failures.set(0);
}
BlockingTask task = new BlockingTask(0, toThrow);
executor.submit(task, onSuccess, onError, phaser);
assertEquals(maxTasks - 1, availableSlots.getValue());
assertEquals(2, phaser.getUnarrivedParties());
task.latch.countDown();
phaser.arriveAndAwaitAdvance();
assertEquals(maxTasks, availableSlots.getValue());
assertEquals(0, successful.get());
assertEquals(1, failures.get());
}
@Test
public void handleUncaughtExceptionInSuccessCallback() {
// Ensure that if the success callback throws, a permit for submission is
// still returned and the phaser notified
int maxTasks = 5;
MetricRegistry metrics = metrics();
ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
final AtomicInteger successful = new AtomicInteger(0);
final AtomicInteger failures = new AtomicInteger(0);
final Consumer<Integer> onSuccess = (i) -> { successful.incrementAndGet(); throw new RuntimeException("UNCAUGHT"); };
final Consumer<Throwable> onError = (t) -> failures.incrementAndGet();
final Phaser phaser = new Phaser(1);
assertEquals(maxTasks, availableSlots.getValue());
try {
onSuccess.accept(0);
fail("Success callback should throw RuntimeException");
} catch (RuntimeException e) {
// expected - reset failure count
successful.set(0);
}
BlockingTask task = new BlockingTask(0);
executor.submit(task, onSuccess, onError, phaser);
assertEquals(maxTasks - 1, availableSlots.getValue());
assertEquals(2, phaser.getUnarrivedParties());
task.latch.countDown();
phaser.arriveAndAwaitAdvance();
assertEquals(maxTasks, availableSlots.getValue());
assertEquals(1, successful.get());
assertEquals(0, failures.get());
}
@Test
public void handleRejectedExecutionException() {
// In the case that the underlying ExecutorService rejects a task submission, a permit
// should be returned and the phaser notified
int maxTasks = 5;
final AtomicInteger successful = new AtomicInteger(0);
final AtomicInteger failures = new AtomicInteger(0);
final AtomicInteger rejections = new AtomicInteger(0);
final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
final Consumer<Throwable> onError = (t) -> failures.incrementAndGet();
MetricRegistry metrics = metrics();
ExecutorService rejectingExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
(r, executor) -> { rejections.incrementAndGet();
throw new RejectedExecutionException("REJECTED");});
ComparisonExecutor executor = new ComparisonExecutor(MoreExecutors.listeningDecorator(rejectingExecutor), maxTasks, metrics);
Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
final Phaser phaser = new Phaser(1);
// Submit an initial pair of tasks to ensure that the underlying work queue is full
BlockingTask t0 = new BlockingTask(0);
BlockingTask t1 = new BlockingTask(1);
executor.submit(t0, onSuccess, onError, phaser);
executor.submit(t1, onSuccess, onError, phaser);
assertEquals(3, phaser.getUnarrivedParties());
assertEquals(maxTasks - 2, availableSlots.getValue());
// Submit a third task which will be rejected by the executor service
executor.submit(new BlockingTask(2), onSuccess, onError, phaser);
t0.latch.countDown();
t1.latch.countDown();
phaser.arriveAndAwaitAdvance();
assertEquals(maxTasks, availableSlots.getValue());
assertEquals(2, successful.get());
assertEquals(1, failures.get());
assertEquals(1, rejections.get());
}
class BlockingTask implements Callable<Integer> {
final int id;
final Exception e;
final CountDownLatch latch;
BlockingTask(int id) {
this(id, null);
}
BlockingTask(int id, Exception toThrow) {
this.id = id;
this.e = toThrow;
this.latch = new CountDownLatch(1);
}
public Integer call() throws Exception {
latch.await();
if (e != null)
throw e;
return id;
}
}
private static ListeningExecutorService executor(int threads) {
return MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder().setNameFormat("partition-comparison-%d")
.setDaemon(true)
.build()));
}
private static MetricRegistry metrics() {
return new MetricRegistry();
}
}