blob: 902f4b7fb84c3015bbe8beb7d3931577e16cc1fc [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.util;
import static org.junit.Assert.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.internal.util.AbortableTaskService.AbortableTask;
import com.gemstone.gemfire.test.junit.categories.UnitTest;
@Category(UnitTest.class)
public class AbortableTaskServiceJUnitTest {
private static final long TIMEOUT_SECONDS = 10;
private volatile CountDownLatch delay;
private ExecutorService futures;
private AbortableTaskService tasks;
@Before
public void setUp() {
this.delay = new CountDownLatch(1);
this.tasks = new AbortableTaskService(Executors.newSingleThreadExecutor());
this.futures = Executors.newSingleThreadExecutor();
}
@After
public void tearDown() {
if (this.delay != null && this.delay.getCount() > 0) {
this.delay.countDown();
}
this.tasks.abortAll();
assertTrue(this.futures.shutdownNow().isEmpty());
}
@Test
public void testExecute() throws Exception {
DelayedTask dt = new DelayedTask();
this.tasks.execute(dt);
Future<Boolean> future = this.futures.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
tasks.waitForCompletion();
return tasks.isCompleted();
}
});
this.delay.countDown();
assertTrue(future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
assertFalse(dt.wasAborted.get());
assertTrue(dt.wasRun.get());
assertTrue(this.tasks.isCompleted());
}
@Test
public void testAbortDuringExecute() throws Exception {
DelayedTask dt = new DelayedTask();
this.tasks.execute(dt);
Future<Boolean> future = this.futures.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
tasks.waitForCompletion();
return tasks.isCompleted();
}
});
this.tasks.abortAll();
this.delay.countDown();
assertTrue(future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
assertTrue(dt.wasAborted.get());
//assertTrue(dt.wasRun.get()); -- race condition can result in true or false
assertTrue(this.tasks.isCompleted());
}
@Test
public void testAbortBeforeExecute() throws Exception {
// delay underlying call to execute(Runnable) until after abortAll() is invoked
Executor executor = (Executor) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {Executor.class}, new DelayedExecutorHandler(Executors.newSingleThreadExecutor(), "execute"));
this.tasks = new AbortableTaskService(executor);
DelayedTask dt = new DelayedTask();
DelayedTask dt2 = new DelayedTask();
this.tasks.execute(dt);
this.tasks.execute(dt2);
Future<Boolean> future = this.futures.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
tasks.waitForCompletion();
return tasks.isCompleted();
}
});
this.tasks.abortAll();
this.delay.countDown();
assertTrue(future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
assertTrue(dt.wasAborted.get());
assertTrue(dt2.wasAborted.get());
assertFalse(dt.wasRun.get());
assertFalse(dt2.wasRun.get());
assertTrue(this.tasks.isCompleted());
}
/**
* AbortableTask that waits on the CountDownLatch proceeding.
*/
private class DelayedTask implements AbortableTask {
private final AtomicBoolean wasAborted = new AtomicBoolean(false);
private final AtomicBoolean wasRun = new AtomicBoolean(false);
@Override
public void runOrAbort(AtomicBoolean aborted) {
try {
delay.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
this.wasRun.set(true);
this.wasAborted.set(aborted.get());
}
@Override
public void abortBeforeRun() {
this.wasAborted.set(true);
}
}
/**
* Proxy handler which invokes methodName asynchronously AND delays the
* invocation to the underlying methodName until after CountDownLatch is
* opened.
*
* @author Kirk Lund
*/
private class DelayedExecutorHandler implements InvocationHandler {
private final Executor executor;
private final String methodName;
private final Executor async;
public DelayedExecutorHandler(Executor executor, String methodName) {
this.executor = executor;
this.methodName = methodName;
this.async = Executors.newSingleThreadExecutor();
}
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
this.async.execute(new Runnable() {
public void run() {
try {
if (method.getName().equals(methodName)) {
delay.await();
}
method.invoke(executor, args);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new Error(e);
}
}
});
return null;
}
}
}