blob: 67a4535e78fa6247cf9109e6696067b40332c07c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.oozie.service.AsyncXCommandExecutor.AccessibleRunnableScheduledFuture;
import org.apache.oozie.service.AsyncXCommandExecutor.PriorityComparator;
import org.apache.oozie.service.AsyncXCommandExecutor.ScheduledXCallable;
import org.apache.oozie.service.CallableQueueService.CallableWrapper;
import org.apache.oozie.util.XCallable;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import com.google.common.collect.Sets;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.times;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
@RunWith(MockitoJUnitRunner.class)
@SuppressWarnings("deprecation")
public class TestAsyncXCommandExecutor {
private static final String DEFAULT_TYPE = "test";
private static final int DEFAULT_MAX_ACTIVE_COMMANDS = 5;
private static final boolean DEFAULT_ENABLE_CONCURRENCY_CHECK = true;
private static final long DEFAULT_MAXWAIT = 30_000;
private static final int TEST_PRIORITIES = 5;
private static final int MAX_PRIORITY = TEST_PRIORITIES - 1;
private static final int AWAIT_TERMINATION_TIMEOUT_SECONDS = 1;
@Mock
private ThreadPoolExecutor executor;
@Mock
private ScheduledThreadPoolExecutor scheduledExecutor;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private CallableWrapper<?> callableWrapper;
@Mock
private CallableQueueService callableQueueService;
private PriorityBlockingQueue<CallableWrapper<?>> priorityBlockingQueue;
private BlockingQueue<AccessibleRunnableScheduledFuture<ScheduledXCallable>> delayQueue;
private ConcurrentHashMap<String, Set<CallableWrapper<?>>> pendingCommandsPerType;
private AtomicInteger activeCommands;
private AsyncXCommandExecutor asyncExecutor;
@Before
public void setup() {
activeCommands = new AtomicInteger(0);
priorityBlockingQueue = new PriorityBlockingQueue<>(100, new PriorityComparator());
pendingCommandsPerType = new ConcurrentHashMap<>();
delayQueue = new LinkedBlockingQueue<>(); // in reality it's not LBQ, but it's fine here
asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, DEFAULT_MAXWAIT,
TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS);
when(callableWrapper.filterDuplicates()).thenReturn(true);
when(callableWrapper.getElement().getKey()).thenReturn("key");
when(callableWrapper.getElement().getType()).thenReturn(DEFAULT_TYPE);
}
@Test
public void testSubmitCallableWithNoDelay() {
boolean result = asyncExecutor.queue(callableWrapper, false);
verify(executor).execute(same(callableWrapper));
verifyZeroInteractions(scheduledExecutor);
assertEquals("Active commands", 1, asyncExecutor.getSize());
assertTrue("Queuing result", result);
}
@Test
public void testSubmitCallableWithDelay() {
when(callableWrapper.getInitialDelay()).thenReturn(111L);
when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(222L);
boolean result = asyncExecutor.queue(callableWrapper, false);
verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(222L), eq(TimeUnit.MILLISECONDS));
verifyZeroInteractions(executor);
assertEquals("Active commands", 1, asyncExecutor.getSize());
assertTrue("Queuing result", result);
}
@Test
public void testSubmissionSuccessfulAfterDelay() {
when(callableWrapper.getInitialDelay()).thenReturn(100L);
when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L);
when(callableQueueService.canSubmitCallable(any(XCallable.class))).thenReturn(true);
configureMockScheduler();
asyncExecutor.queue(callableWrapper, false);
verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(50L),
eq(TimeUnit.MILLISECONDS));
verify(executor).execute(callableWrapper);
}
@Test
public void testSubmissionFailsAfterDelay() {
when(callableWrapper.getInitialDelay()).thenReturn(100L);
when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L);
configureMockScheduler();
asyncExecutor.queue(callableWrapper, false);
verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(50L),
eq(TimeUnit.MILLISECONDS));
verifyZeroInteractions(executor);
}
@Test
public void testSubmissionSuccessfulAfterDelayWhenMaxConcurrencyCheckDisabled() {
asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS);
when(callableWrapper.getInitialDelay()).thenReturn(100L);
when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L);
configureMockScheduler();
asyncExecutor.queue(callableWrapper, false);
verify(scheduledExecutor).schedule(any(ScheduledXCallable.class), eq(50L),
eq(TimeUnit.MILLISECONDS));
verify(executor).execute(eq(callableWrapper));
}
@Test
public void testCannotSubmitDueToFiltering() {
when(callableWrapper.filterDuplicates()).thenReturn(false);
boolean result = asyncExecutor.queue(callableWrapper, false);
verifyZeroInteractions(scheduledExecutor);
verifyZeroInteractions(executor);
assertEquals("Active commands", 0, asyncExecutor.getSize());
assertTrue("Queuing result", result);
}
@Test
public void testExceptionThrownDuringSubmission() {
doThrow(new RuntimeException()).when(executor).execute(any(Runnable.class));
boolean exceptionThrown = false;
try {
asyncExecutor.queue(callableWrapper, false);
} catch (RuntimeException e) {
exceptionThrown = true;
}
assertTrue("Exception was not thrown", exceptionThrown);
verify(callableWrapper).removeFromUniqueCallables();
verifyZeroInteractions(scheduledExecutor);
}
@Test
public void testSubmitWithNegativePriority() {
testIllegalPriority(-1);
}
@Test
public void testSubmitWithTooHighPriority() {
testIllegalPriority(MAX_PRIORITY + 1);
}
@Test
public void testQueueSizeWhenCommandIsFinished() {
CallableWrapper<?> delayedCommand = mock(CallableWrapper.class);
when(delayedCommand.getInitialDelay()).thenReturn(100L);
when(delayedCommand.filterDuplicates()).thenReturn(true);
asyncExecutor.queue(callableWrapper, false);
asyncExecutor.queue(delayedCommand, false);
int sizeAfterQueue = asyncExecutor.getSize();
asyncExecutor.commandFinished();
asyncExecutor.commandFinished();
assertEquals("Size after queue", 2, sizeAfterQueue);
assertEquals("Active commands", 0, asyncExecutor.getSize());
}
@Test
public void testQueueSizeWhenQueueIsFullDuringMaxConcurrencyCheck() {
XCallable<?> callable = mock(XCallable.class);
when(callable.getType()).thenReturn(DEFAULT_TYPE);
Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(true);
asyncExecutor.handleConcurrencyExceeded(callableWrapper);
activeCommands.set(20);
asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
assertEquals("Active commands", 19, activeCommands.get());
}
@Test
public void testSubmissionWhenQueueIsFull() {
asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS);
callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS);
when(callableWrapper.filterDuplicates()).thenReturn(true);
when(callableWrapper.getElement().getKey()).thenReturn("key");
asyncExecutor.queue(callableWrapper, false);
asyncExecutor.queue(callableWrapper, false);
boolean finalResult = asyncExecutor.queue(callableWrapper, false);
assertFalse("Last submission shouldn't have succeeded", finalResult);
verify(executor, times(2)).execute(same(callableWrapper));
}
@Test
public void testSubmissionWhenQueueSizeIsIgnored() {
asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, TEST_PRIORITIES, AWAIT_TERMINATION_TIMEOUT_SECONDS);
callableWrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS);
when(callableWrapper.filterDuplicates()).thenReturn(true);
when(callableWrapper.getElement().getKey()).thenReturn("key");
asyncExecutor.queue(callableWrapper, false);
asyncExecutor.queue(callableWrapper, false);
boolean finalResult = asyncExecutor.queue(callableWrapper, true);
assertTrue("Last submission should have succeeded", finalResult);
verify(executor, times(3)).execute(same(callableWrapper));
}
@Test
public void testPendingCommandSubmission() {
XCallable<?> callable = mock(XCallable.class);
when(callable.getType()).thenReturn(DEFAULT_TYPE);
Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(true);
asyncExecutor.handleConcurrencyExceeded(callableWrapper);
asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
verify(executor).execute(eq(callableWrapper));
assertEquals("Number of pending commands", 1, pendingCommandsPerType.size());
Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE);
assertNotNull("List of pending commands doesn't exist", pendingCommandsList);
assertEquals("List of pending commands should be empty", 0, pendingCommandsList.size());
}
@Test
public void testPendingCommandsWithSameType() {
XCallable<?> callable = mock(XCallable.class);
when(callable.getType()).thenReturn(DEFAULT_TYPE);
Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
XCallable<?> secondCallable = mock(XCallable.class);
when(secondCallable.getType()).thenReturn(DEFAULT_TYPE);
CallableWrapper<?> secondWrapper = mock(CallableWrapper.class);
Mockito.<XCallable<?>>when(secondWrapper.getElement()).thenReturn(secondCallable);
asyncExecutor.handleConcurrencyExceeded(callableWrapper);
asyncExecutor.handleConcurrencyExceeded(secondWrapper);
assertEquals("Number of pending commands", 1, pendingCommandsPerType.size());
Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE);
assertNotNull("List of pending commands doesn't exist", pendingCommandsList);
assertEquals("List of pending commands", 2, pendingCommandsList.size());
}
@Test
public void testPendingCommandSubmissionWhenQueueIsFull() {
XCallable<?> callable = mock(XCallable.class);
when(callable.getType()).thenReturn(DEFAULT_TYPE);
Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(true);
activeCommands.set(10);
asyncExecutor.handleConcurrencyExceeded(callableWrapper);
asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
verifyZeroInteractions(executor);
assertEquals("Number of pending commands", 1, pendingCommandsPerType.size());
Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE);
assertNotNull("List of pending commands doesn't exist", pendingCommandsList);
assertEquals("List of pending commands should be empty", 0, pendingCommandsList.size());
}
@Test
public void testPendingCommandSubmissionWhenMaxConcurrencyReached() {
XCallable<?> callable = mock(XCallable.class);
when(callable.getType()).thenReturn(DEFAULT_TYPE);
Mockito.<XCallable<?>>when(callableWrapper.getElement()).thenReturn(callable);
when(callableQueueService.canSubmitCallable(eq(callable))).thenReturn(false);
asyncExecutor.handleConcurrencyExceeded(callableWrapper);
asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
verifyZeroInteractions(executor);
assertEquals("Number of pending commands", 1, pendingCommandsPerType.size());
Set<CallableWrapper<?>> pendingCommandsList = pendingCommandsPerType.get(DEFAULT_TYPE);
assertNotNull("List of pending commands doesn't exist", pendingCommandsList);
assertEquals("List of pending commands list should not be empty", 1, pendingCommandsList.size());
}
@Test
public void testQueueDump() {
CallableWrapper<?> pendingCallable = mock(CallableWrapper.class);
CallableWrapper<?> waitingCallable = mock(CallableWrapper.class);
ScheduledXCallable delayedXCallable = mock(ScheduledXCallable.class);
@SuppressWarnings("unchecked")
AccessibleRunnableScheduledFuture<ScheduledXCallable> asrf = mock(AccessibleRunnableScheduledFuture.class);
Mockito.<CallableWrapper<?>>when(delayedXCallable.getCallableWrapper()).thenReturn(waitingCallable);
when(asrf.getTask()).thenReturn(delayedXCallable);
when(pendingCallable.toString()).thenReturn("pendingCallable");
when(waitingCallable.toString()).thenReturn("waitingCallable");
when(callableWrapper.toString()).thenReturn("callableWrapper");
priorityBlockingQueue.add(callableWrapper);
delayQueue.add(asrf);
pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(pendingCallable));
List<String> queueDump = asyncExecutor.getQueueDump();
assertEquals("Size", 3, queueDump.size());
assertTrue("PendingCallable not found", queueDump.contains("pendingCallable"));
assertTrue("WaitingCallable not found", queueDump.contains("waitingCallable"));
assertTrue("CallableWrapper not found", queueDump.contains("callableWrapper"));
}
@Test
public void testAntiStarvationWhenDelayIsAboveMaxWait() {
asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES,
AWAIT_TERMINATION_TIMEOUT_SECONDS);
when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-40000L);
when(callableWrapper.getPriority()).thenReturn(0);
pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper));
asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
verify(callableWrapper).setPriority(1);
verify(callableWrapper).setDelay(eq(0L), eq(TimeUnit.MILLISECONDS));
}
@Test
public void testAntiStarvationWhenDelayIsBelowMaxWait() {
when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-200L);
when(callableWrapper.getPriority()).thenReturn(0);
pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper));
asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
verify(callableWrapper, never()).setPriority(anyInt());
verify(callableWrapper, never()).setDelay(anyLong(), any(TimeUnit.class));
}
@Test
public void testAntiStarvationWhenPriorityIsHighest() {
asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES,
AWAIT_TERMINATION_TIMEOUT_SECONDS);
when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-1000L);
when(callableWrapper.getPriority()).thenReturn(MAX_PRIORITY);
pendingCommandsPerType.put(DEFAULT_TYPE, Sets.newHashSet(callableWrapper));
asyncExecutor.checkMaxConcurrency(DEFAULT_TYPE);
verify(callableWrapper, never()).setPriority(anyInt());
verify(callableWrapper, never()).setDelay(anyLong(), any(TimeUnit.class));
}
@Test
public void testShutDown() throws InterruptedException {
when(executor.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
when(scheduledExecutor.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true);
asyncExecutor.shutdown();
verify(executor).shutdown();
verify(executor).awaitTermination(eq(1000L), eq(TimeUnit.MILLISECONDS));
verify(scheduledExecutor).shutdown();
verify(scheduledExecutor).awaitTermination(eq(1000L), eq(TimeUnit.MILLISECONDS));
}
@Test
public void testPriorityHandling() {
asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 100, DEFAULT_MAXWAIT, 100,
AWAIT_TERMINATION_TIMEOUT_SECONDS);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
CallableWrapper<?> wrapper = (CallableWrapper<?>) invocation.getArguments()[0];
priorityBlockingQueue.add(wrapper);
return null;
}
}).when(executor).execute(any(Runnable.class));
List<CallableWrapper<?>> mockedWrappers = new ArrayList<>();
for (int i = 0; i < 100; i++) {
CallableWrapper<?> wrapper = mock(CallableWrapper.class, Mockito.RETURNS_DEEP_STUBS);
when(wrapper.getPriority()).thenReturn(i);
when(wrapper.getInitialDelay()).thenReturn(0L);
when(wrapper.filterDuplicates()).thenReturn(true);
when(wrapper.getElement().getName()).thenReturn(String.valueOf(i));
mockedWrappers.add(wrapper);
}
for (CallableWrapper<?> callable : mockedWrappers) {
asyncExecutor.queue(callable, false);
}
CallableWrapper<?> firstElement = priorityBlockingQueue.poll();
CallableWrapper<?> lastElement = null;
CallableWrapper<?> previous = null;
do {
previous = lastElement;
lastElement = priorityBlockingQueue.poll();
} while (lastElement != null);
lastElement = previous;
assertEquals("Priority - first element", 99, firstElement.getPriority());
assertEquals("Priority - last element", 0, lastElement.getPriority());
}
private void testIllegalPriority(int prio) {
when(callableWrapper.getPriority()).thenReturn(prio);
boolean exceptionThrown = false;
Throwable cause = null;
try {
asyncExecutor.queue(callableWrapper, false);
} catch (RuntimeException e) {
exceptionThrown = true;
cause = e.getCause();
}
assertTrue("Exception was not thrown", exceptionThrown);
verifyZeroInteractions(scheduledExecutor);
verifyZeroInteractions(executor);
assertTrue("Illegal exception", cause instanceof IllegalArgumentException);
verify(callableWrapper).removeFromUniqueCallables();
}
private void configureMockScheduler() {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ScheduledXCallable target = (ScheduledXCallable) invocation.getArguments()[0];
target.run();
return null;
}
}).when(scheduledExecutor).schedule(any(ScheduledXCallable.class), any(Long.class),
any(TimeUnit.class));
}
private AsyncXCommandExecutor createExecutor(boolean needMaxConcurrencyCheck, int maxActiveCallables,
long maxWait, int priorities, int awaitTerminationTimeoutSeconds) {
return new AsyncXCommandExecutor(needMaxConcurrencyCheck,
callableQueueService,
maxActiveCallables,
executor,
scheduledExecutor,
priorityBlockingQueue,
delayQueue,
pendingCommandsPerType,
activeCommands,
maxWait,
priorities,
awaitTerminationTimeoutSeconds);
}
}