blob: b6933479862a7de71aab2b283d984be34a9b227d [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.task;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
class SerialTaskManagerWorkerTest {
private TaskManagerWorker.Listener listener;
private SerialTaskManagerWorker worker;
private final Task successfulTask = new CompletedTask();
private final Task failedTask = new FailedTask();
private final Task throwingTask = new ThrowingTask();
@BeforeEach
void beforeEach() {
listener = mock(TaskManagerWorker.Listener.class);
worker = new SerialTaskManagerWorker(listener);
}
@AfterEach
void tearDown() throws IOException {
worker.close();
}
@Test
void aSuccessfullTaskShouldCompleteSuccessfully() {
TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), this.successfulTask);
Mono<Task.Result> result = worker.executeTask(taskWithId);
assertThat(result.block()).isEqualTo(Task.Result.COMPLETED);
verify(listener, atLeastOnce()).completed(taskWithId.getId(), Task.Result.COMPLETED, Optional.empty());
}
@Test
void aRunningTaskShouldProvideInformationUpdatesDuringExecution() throws InterruptedException {
TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) ->
Mono.fromCallable(counter::incrementAndGet)
.delayElement(Duration.ofSeconds(2))
.repeat(10)
.then(Mono.just(Task.Result.COMPLETED))
.block()));
worker.executeTask(taskWithId).subscribe();
TimeUnit.SECONDS.sleep(2);
verify(listener, atLeastOnce()).updated(eq(taskWithId.getId()), notNull());
}
@Test
void aRunningTaskShouldHaveAFiniteNumberOfInformation() throws InterruptedException {
TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) ->
Mono.fromCallable(counter::incrementAndGet)
.delayElement(Duration.ofSeconds(1))
.repeat(2)
.then(Mono.just(Task.Result.COMPLETED))
.block()));
worker.executeTask(taskWithId).block();
verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull());
}
@Test
void aRunningTaskShouldEmitAtMostOneInformationPerSecond() {
TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) ->
Mono.fromCallable(counter::incrementAndGet)
.delayElement(Duration.ofMillis(10))
.repeat(200)
.then(Mono.just(Task.Result.COMPLETED))
.block()));
worker.executeTask(taskWithId).block();
verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull());
}
@Test
void aFailedTaskShouldCompleteWithFailedStatus() {
TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), failedTask);
Mono<Task.Result> result = worker.executeTask(taskWithId);
assertThat(result.block()).isEqualTo(Task.Result.PARTIAL);
verify(listener, atLeastOnce()).failed(taskWithId.getId(), Optional.empty());
}
@Test
void aThrowingTaskShouldCompleteWithFailedStatus() {
TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), throwingTask);
Mono<Task.Result> result = worker.executeTask(taskWithId);
assertThat(result.block()).isEqualTo(Task.Result.PARTIAL);
verify(listener, atLeastOnce()).failed(eq(taskWithId.getId()), eq(Optional.empty()), any(RuntimeException.class));
}
@Test
void theWorkerShouldReportThatATaskIsInProgress() throws InterruptedException {
TaskId id = TaskId.generateTaskId();
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch taskLaunched = new CountDownLatch(1);
Task inProgressTask = new MemoryReferenceTask(() -> {
taskLaunched.countDown();
await(latch);
return Task.Result.COMPLETED;
});
TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
worker.executeTask(taskWithId).subscribe();
await(taskLaunched);
verify(listener, atLeastOnce()).started(id);
verifyNoMoreInteractions(listener);
latch.countDown();
}
@Test
void theWorkerShouldCancelAnInProgressTask() throws InterruptedException {
TaskId id = TaskId.generateTaskId();
AtomicInteger counter = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
Task inProgressTask = new MemoryReferenceTask(() -> {
await(latch);
counter.incrementAndGet();
return Task.Result.COMPLETED;
});
TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
Mono<Task.Result> resultMono = worker.executeTask(taskWithId).cache();
resultMono.subscribe();
Awaitility.waitAtMost(org.awaitility.Duration.TEN_SECONDS)
.untilAsserted(() -> verify(listener, atLeastOnce()).started(id));
worker.cancelTask(id);
resultMono.block(Duration.ofSeconds(10));
verify(listener, atLeastOnce()).cancelled(id, Optional.empty());
verifyNoMoreInteractions(listener);
}
private void await(CountDownLatch countDownLatch) throws InterruptedException {
countDownLatch.await();
}
}