blob: e2101adafefc4468cba7cd60a505b2442ce95ac3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.drill.exec.util.concurrent.ExecutorServiceUtil;
import org.apache.drill.test.DrillTest;
import org.junit.Test;
/** Tests for validating the Drill executor service utility class */
public final class ExecutorServiceUtilTest extends DrillTest {
@Test
public void testSuccessfulExecution() throws Exception {
final int numThreads = 2;
final int numTasks = 20;
ExecutorService service = Executors.newFixedThreadPool(numThreads);
List<RequestContainer> requests = new ArrayList<>(numTasks);
// Set the test parameters (using the default values)
TestParams params = new TestParams();
// Launch the tasks
for (int idx = 0; idx < numTasks; idx++) {
CallableTask task = new CallableTask(params);
Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
requests.add(new RequestContainer(future, task));
}
int numSuccess = 0;
// Wait for the tasks to finish
for (int idx = 0; idx < numTasks; idx++) {
RequestContainer request = requests.get(idx);
try {
TaskResult result = request.future.get();
assertNotNull(result);
if (result.isSuccess()) {
++numSuccess;
}
} catch (Exception e) {
// NOOP
}
}
assertEquals(numTasks, numSuccess);
}
@Test
public void testFailedExecution() throws Exception {
final int numThreads = 2;
final int numTasks = 20;
ExecutorService service = Executors.newFixedThreadPool(numThreads);
List<RequestContainer> requests = new ArrayList<>(numTasks);
// Set the test parameters
TestParams params = new TestParams();
params.generateException = true;
// Launch the tasks
for (int idx = 0; idx < numTasks; idx++) {
CallableTask task = new CallableTask(params);
Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
requests.add(new RequestContainer(future, task));
}
int numSuccess = 0;
int numFailures = 0;
// Wait for the tasks to finish
for (int idx = 0; idx < numTasks; idx++) {
RequestContainer request = requests.get(idx);
try {
TaskResult result = request.future.get();
assertNotNull(result);
if (result.isSuccess()) {
++numSuccess;
}
} catch (Exception e) {
assertTrue(request.task.result.isFailed());
++numFailures;
}
}
assertEquals(0, numSuccess);
assertEquals(numTasks, numFailures);
}
@Test
public void testMixedExecution() throws Exception {
final int numThreads = 2;
final int numTasks = 20;
ExecutorService service = Executors.newFixedThreadPool(numThreads);
List<RequestContainer> requests = new ArrayList<>(numTasks);
// Set the test parameters
TestParams successParams = new TestParams();
TestParams failedParams = new TestParams();
failedParams.generateException = true;
int expNumFailedTasks = 0;
int expNumSuccessTasks = 0;
// Launch the tasks
for (int idx = 0; idx < numTasks; idx++) {
CallableTask task = null;
if (idx % 2 == 0) {
task = new CallableTask(successParams);
++expNumSuccessTasks;
} else {
task = new CallableTask(failedParams);
++expNumFailedTasks;
}
Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
requests.add(new RequestContainer(future, task));
}
int numSuccess = 0;
int numFailures = 0;
// Wait for the tasks to finish
for (int idx = 0; idx < numTasks; idx++) {
RequestContainer request = requests.get(idx);
try {
TaskResult result = request.future.get();
assertNotNull(result);
if (result.isSuccess()) {
++numSuccess;
}
} catch (Exception e) {
assertTrue(request.task.result.isFailed());
++numFailures;
}
}
assertEquals(expNumSuccessTasks, numSuccess);
assertEquals(expNumFailedTasks, numFailures);
}
@Test
public void testCancelExecution() throws Exception {
final int numThreads = 2;
ExecutorService service = Executors.newFixedThreadPool(numThreads);
RequestContainer request = null;
// Set the test parameters
TestParams params = new TestParams();
params.controller = new TaskExecutionController();
// Launch the task
CallableTask task = new CallableTask(params);
Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
request = new RequestContainer(future, task);
// Allow the task to start
params.controller.start();
params.controller.hasStarted();
// Allow the task to exit but with a delay so that we can test the blocking nature of "cancel"
params.controller.delayMillisOnExit = 50;
params.controller.exit();
// Cancel the task
boolean result = request.future.cancel(true);
if (result) {
// We were able to cancel this task; let's make sure that it is done now that the current thread is
// unblocked
assertTrue(task.result.isCancelled());
} else {
// Cancellation could't happen most probably because this thread got context switched for
// for a long time (should be rare); let's make sure the task is done and successful
assertTrue(task.result.isSuccess());
}
}
// ----------------------------------------------------------------------------
// Internal Classes
// ----------------------------------------------------------------------------
@SuppressWarnings("unused")
private static final class TaskResult {
private enum ExecutionStatus {
NOT_STARTED,
RUNNING,
SUCCEEDED,
FAILED,
CANCELLED
};
private ExecutionStatus status;
TaskResult() {
status = ExecutionStatus.NOT_STARTED;
}
private boolean isSuccess() {
return status.equals(ExecutionStatus.SUCCEEDED);
}
private boolean isFailed() {
return status.equals(ExecutionStatus.FAILED);
}
private boolean isCancelled() {
return status.equals(ExecutionStatus.CANCELLED);
}
private boolean isFailedOrCancelled() {
return status.equals(ExecutionStatus.CANCELLED)
|| status.equals(ExecutionStatus.FAILED);
}
}
@SuppressWarnings("unused")
private static final class TaskExecutionController {
private boolean canStart = false;
private boolean canExit = false;
private boolean started = false;
private boolean exited = false;
private int delayMillisOnExit = 0;
private Object monitor = new Object();
private void canStart() {
synchronized(monitor) {
while (!canStart) {
try {
monitor.wait();
} catch (InterruptedException ie) {
// NOOP
}
}
started = true;
monitor.notify();
}
}
private void canExit() {
synchronized(monitor) {
while (!canExit) {
try {
monitor.wait();
} catch (InterruptedException ie) {
// NOOP
}
}
}
// Wait requested delay time before exiting
for (int i = 0; i < delayMillisOnExit; i++) {
try {
Thread.sleep(1); // sleep 1 ms
} catch (InterruptedException ie) {
// NOOP
}
}
synchronized(monitor) {
exited = true;
monitor.notify();
}
}
private void start() {
synchronized(monitor) {
canStart = true;
monitor.notify();
}
}
private void exit() {
synchronized(monitor) {
canExit = true;
monitor.notify();
}
}
private void hasStarted() {
synchronized(monitor) {
while (!started) {
try {
monitor.wait();
} catch (InterruptedException ie) {
// NOOP
}
}
}
}
private void hasExited() {
synchronized(monitor) {
while (!exited) {
try {
monitor.wait();
} catch (InterruptedException ie) {
// NOOP
}
}
}
}
}
private static final class TestParams {
private int waitTimeMillis = 2;
private boolean generateException = false;
private TaskExecutionController controller = null;
}
private static final class CallableTask implements Callable<TaskResult> {
private volatile TaskResult result = new TaskResult();
private final TestParams params;
private CallableTask(TestParams params) {
this.params = params;
}
@Override
public TaskResult call() throws Exception {
beforeStart();
result.status = TaskResult.ExecutionStatus.RUNNING;
boolean interrupted = false;
Exception exc = null;
try {
for (int i = 0; i < params.waitTimeMillis; i++) {
try {
Thread.sleep(1); // sleep 1 ms
} catch (InterruptedException ie) {
interrupted = true;
}
}
if (params.generateException) {
throw new RuntimeException("Test emulated exception..");
}
} catch (Exception e) {
exc = e;
throw e;
} finally {
beforeExit();
if (interrupted) {
result.status = TaskResult.ExecutionStatus.CANCELLED;
} else if (exc != null) {
result.status = TaskResult.ExecutionStatus.FAILED;
} else {
result.status = TaskResult.ExecutionStatus.SUCCEEDED;
}
}
return result;
}
private void beforeStart() {
if (params.controller != null) {
params.controller.canStart();
}
}
private void beforeExit() {
if (params.controller != null) {
params.controller.canExit();
}
}
}
private static final class RequestContainer {
private final Future<TaskResult> future;
private final CallableTask task;
private RequestContainer(Future<TaskResult> future, CallableTask task) {
this.future = future;
this.task = task;
}
}
}