blob: 6368d050c6ea54c4b8a24e8804ea71c4dc0296e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.util;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.apache.geode.internal.util.AbortableTaskService.AbortableTask;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
public class AbortableTaskServiceJUnitTest {
private static final long TIMEOUT_SECONDS = 10;
private volatile CountDownLatch delay;
private AbortableTaskService tasks;
@Rule
public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
@Before
public void setUp() {
this.delay = new CountDownLatch(1);
this.tasks = new AbortableTaskService(Executors.newSingleThreadExecutor());
}
@After
public void tearDown() {
if (this.delay != null && this.delay.getCount() > 0) {
this.delay.countDown();
}
this.tasks.abortAll();
}
@Test
public void testExecute() throws Exception {
DelayedTask dt = new DelayedTask();
this.tasks.execute(dt);
Future<Boolean> future = executorServiceRule.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
tasks.waitForCompletion();
return tasks.isCompleted();
}
});
this.delay.countDown();
assertTrue(future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
assertFalse(dt.wasAborted.get());
assertTrue(dt.wasRun.get());
assertTrue(this.tasks.isCompleted());
}
@Test
public void testAbortDuringExecute() throws Exception {
DelayedTask dt = new DelayedTask();
this.tasks.execute(dt);
Future<Boolean> future = executorServiceRule.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
tasks.waitForCompletion();
return tasks.isCompleted();
}
});
this.tasks.abortAll();
this.delay.countDown();
assertTrue(future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
assertTrue(dt.wasAborted.get());
// assertTrue(dt.wasRun.get()); -- race condition can result in true or false
assertTrue(this.tasks.isCompleted());
}
@Test
public void testAbortBeforeExecute() throws Exception {
// delay underlying call to execute(Runnable) until after abortAll() is invoked
Executor executor =
(Executor) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {Executor.class},
new DelayedExecutorHandler(Executors.newSingleThreadExecutor(), "execute"));
this.tasks = new AbortableTaskService(executor);
DelayedTask dt = new DelayedTask();
DelayedTask dt2 = new DelayedTask();
this.tasks.execute(dt);
this.tasks.execute(dt2);
Future<Boolean> future = executorServiceRule.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
tasks.waitForCompletion();
return tasks.isCompleted();
}
});
this.tasks.abortAll();
this.delay.countDown();
assertTrue(future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
assertTrue(dt.wasAborted.get());
assertTrue(dt2.wasAborted.get());
assertFalse(dt.wasRun.get());
assertFalse(dt2.wasRun.get());
assertTrue(this.tasks.isCompleted());
}
/**
* AbortableTask that waits on the CountDownLatch proceeding.
*/
private class DelayedTask implements AbortableTask {
private final AtomicBoolean wasAborted = new AtomicBoolean(false);
private final AtomicBoolean wasRun = new AtomicBoolean(false);
@Override
public void runOrAbort(AtomicBoolean aborted) {
try {
delay.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
this.wasRun.set(true);
this.wasAborted.set(aborted.get());
}
@Override
public void abortBeforeRun() {
this.wasAborted.set(true);
}
}
/**
* Proxy handler which invokes methodName asynchronously AND delays the invocation to the
* underlying methodName until after CountDownLatch is opened.
*/
private class DelayedExecutorHandler implements InvocationHandler {
private final Executor executor;
private final String methodName;
private final Executor async;
public DelayedExecutorHandler(Executor executor, String methodName) {
this.executor = executor;
this.methodName = methodName;
this.async = Executors.newSingleThreadExecutor();
}
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
this.async.execute(new Runnable() {
@Override
public void run() {
try {
if (method.getName().equals(methodName)) {
delay.await();
}
method.invoke(executor, args);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new Error(e);
}
}
});
return null;
}
}
}