blob: a275005e3036ced59554371ea91c1b4394a76c86 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.webadmin.integration.rabbitmq;
import static io.restassured.RestAssured.given;
import static io.restassured.RestAssured.when;
import static io.restassured.RestAssured.with;
import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
import static org.awaitility.Duration.ONE_MINUTE;
import static org.awaitility.Duration.TEN_SECONDS;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasSize;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.james.CassandraExtension;
import org.apache.james.CassandraRabbitMQJamesServerMain;
import org.apache.james.DockerElasticSearchExtension;
import org.apache.james.GuiceJamesServer;
import org.apache.james.GuiceModuleTestExtension;
import org.apache.james.JamesServerBuilder;
import org.apache.james.JamesServerExtension;
import org.apache.james.core.Username;
import org.apache.james.junit.categories.BasicFeature;
import org.apache.james.mailbox.DefaultMailboxes;
import org.apache.james.mailbox.events.Event;
import org.apache.james.mailbox.events.Group;
import org.apache.james.mailbox.events.MailboxListener;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.modules.AwsS3BlobStoreExtension;
import org.apache.james.modules.MailboxProbeImpl;
import org.apache.james.modules.RabbitMQExtension;
import org.apache.james.modules.TestJMAPServerModule;
import org.apache.james.probe.DataProbe;
import org.apache.james.utils.DataProbeImpl;
import org.apache.james.utils.WebAdminGuiceProbe;
import org.apache.james.webadmin.WebAdminUtils;
import org.apache.james.webadmin.integration.WebadminIntegrationTestModule;
import org.apache.james.webadmin.routes.EventDeadLettersRoutes;
import org.apache.james.webadmin.routes.TasksRoutes;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.core.ConditionFactory;
import org.eclipse.jetty.http.HttpStatus;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.RegisterExtension;
import com.google.inject.Module;
import com.google.inject.multibindings.Multibinder;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
@Tag(BasicFeature.TAG)
class RabbitMQEventDeadLettersIntegrationTest {
public static class RetryEventsListenerGroup extends Group {
}
public static class RetryEventsListener implements MailboxListener.GroupMailboxListener {
static final Group GROUP = new RetryEventsListenerGroup();
private final AtomicInteger totalCalls;
private int callsBeforeSuccess;
private Map<Event.EventId, Integer> callsByEventId;
private List<Event> successfulEvents;
RetryEventsListener() {
this.callsBeforeSuccess = 0;
this.callsByEventId = new HashMap<>();
this.successfulEvents = new ArrayList<>();
this.totalCalls = new AtomicInteger(0);
}
@Override
public Group getDefaultGroup() {
return GROUP;
}
@Override
public void event(Event event) throws Exception {
totalCalls.incrementAndGet();
if (done(event)) {
callsByEventId.remove(event.getEventId());
successfulEvents.add(event);
} else {
increaseRetriesCount(event);
throw new RuntimeException("throw to trigger retry");
}
}
private void increaseRetriesCount(Event event) {
callsByEventId.put(event.getEventId(), retriesCount(event) + 1);
}
int retriesCount(Event event) {
return callsByEventId.getOrDefault(event.getEventId(), 0);
}
boolean done(Event event) {
return retriesCount(event) >= callsBeforeSuccess;
}
List<Event> getSuccessfulEvents() {
return successfulEvents;
}
void callsPerEventBeforeSuccess(int retriesBeforeSuccess) {
this.callsBeforeSuccess = retriesBeforeSuccess;
}
}
public static class RetryEventsListenerExtension implements GuiceModuleTestExtension {
private RetryEventsListener retryEventsListener;
@Override
public void beforeEach(ExtensionContext extensionContext) throws Exception {
retryEventsListener = new RetryEventsListener();
}
@Override
public Module getModule() {
return binder -> Multibinder.newSetBinder(binder, MailboxListener.GroupMailboxListener.class)
.addBinding()
.toInstance(retryEventsListener);
}
@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
return parameterContext.getParameter().getType() == RetryEventsListener.class;
}
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
return retryEventsListener;
}
}
@RegisterExtension
static JamesServerExtension testExtension = new JamesServerBuilder()
.extension(new DockerElasticSearchExtension())
.extension(new CassandraExtension())
.extension(new AwsS3BlobStoreExtension())
.extension(new RabbitMQExtension())
.extension(new RetryEventsListenerExtension())
.server(configuration -> GuiceJamesServer.forConfiguration(configuration)
.combineWith(CassandraRabbitMQJamesServerMain.MODULES)
.overrideWith(TestJMAPServerModule.limitToTenMessages())
.overrideWith(new WebadminIntegrationTestModule()))
.build();
//This value is duplicated from default configuration to ensure we keep the same behavior over time
//unless we really want to change that default value
private static final int MAX_RETRIES = 3;
private static final String DOMAIN = "domain.tld";
private static final String BOB = "bob@" + DOMAIN;
private static final String BOB_PASSWORD = "bobPassword";
private static final String EVENTS_ACTION = "reDeliver";
private static final String GROUP_ID = new RetryEventsListenerGroup().asString();
private static final MailboxPath BOB_INBOX_PATH = MailboxPath.inbox(Username.of(BOB));
private Duration slowPacedPollInterval = ONE_HUNDRED_MILLISECONDS;
private ConditionFactory calmlyAwait = Awaitility.with()
.pollInterval(slowPacedPollInterval)
.and()
.with()
.pollDelay(slowPacedPollInterval)
.await();
private ConditionFactory awaitAtMostTenSeconds = calmlyAwait.atMost(10, TimeUnit.SECONDS);
private MailboxProbeImpl mailboxProbe;
@BeforeEach
void setUp(GuiceJamesServer guiceJamesServer) throws Exception {
DataProbe dataProbe = guiceJamesServer.getProbe(DataProbeImpl.class);
mailboxProbe = guiceJamesServer.getProbe(MailboxProbeImpl.class);
WebAdminGuiceProbe webAdminGuiceProbe = guiceJamesServer.getProbe(WebAdminGuiceProbe.class);
RestAssured.requestSpecification = WebAdminUtils.buildRequestSpecification(webAdminGuiceProbe.getWebAdminPort())
.build();
dataProbe.addDomain(DOMAIN);
dataProbe.addUser(BOB, BOB_PASSWORD);
}
private MailboxId generateInitialEvent() {
return mailboxProbe.createMailbox(BOB_INBOX_PATH);
}
private void generateSecondEvent() {
mailboxProbe.createMailbox(MailboxPath.forUser(Username.of(BOB), DefaultMailboxes.OUTBOX));
}
private String retrieveFirstFailedInsertionId() {
calmlyAwait.atMost(TEN_SECONDS)
.untilAsserted(() ->
when()
.get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
.then()
.body(".", hasSize(1)));
return (String) with()
.get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
.jsonPath()
.getList(".")
.get(0);
}
@Test
void failedEventShouldBeStoredInDeadLetterUnderItsGroupId(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
generateInitialEvent();
waitForCalls(retryEventsListener, MAX_RETRIES + 1);
when()
.get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
.then()
.statusCode(HttpStatus.OK_200)
.contentType(ContentType.JSON)
.body(".", hasSize(1));
}
@Test
void successfulEventShouldNotBeStoredInDeadLetter(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES - 1);
generateInitialEvent();
calmlyAwait.atMost(ONE_MINUTE).until(() -> !retryEventsListener.successfulEvents.isEmpty());
when()
.get(EventDeadLettersRoutes.BASE_PATH + "/groups")
.then()
.statusCode(HttpStatus.OK_200)
.contentType(ContentType.JSON)
.body(".", hasSize(0));
}
@Test
void groupIdOfFailedEventShouldBeStoredInDeadLetter(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
generateInitialEvent();
waitForCalls(retryEventsListener, MAX_RETRIES + 1);
when()
.get(EventDeadLettersRoutes.BASE_PATH + "/groups")
.then()
.statusCode(HttpStatus.OK_200)
.contentType(ContentType.JSON)
.body(".", containsInAnyOrder(GROUP_ID));
}
@Test
void failedEventShouldBeStoredInDeadLetter(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
MailboxId mailboxId = generateInitialEvent();
waitForCalls(retryEventsListener, MAX_RETRIES + 1);
String failedInsertionId = retrieveFirstFailedInsertionId();
when()
.get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID + "/" + failedInsertionId)
.then()
.statusCode(HttpStatus.OK_200)
.contentType(ContentType.JSON)
.body("MailboxAdded.mailboxId", is(mailboxId.serialize()))
.body("MailboxAdded.user", is(BOB))
.body("MailboxAdded.mailboxPath.namespace", is(BOB_INBOX_PATH.getNamespace()))
.body("MailboxAdded.mailboxPath.user", is(BOB_INBOX_PATH.getUser().asString()))
.body("MailboxAdded.mailboxPath.name", is(BOB_INBOX_PATH.getName()));
}
@Test
void multipleFailedEventShouldBeStoredInDeadLetter(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
generateInitialEvent();
generateSecondEvent();
waitForCalls(retryEventsListener, (MAX_RETRIES + 1) * 2);
when()
.get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
.then()
.statusCode(HttpStatus.OK_200)
.contentType(ContentType.JSON)
.body(".", hasSize(2));
}
@Test
void failedEventShouldNotBeInDeadLetterAfterBeingDeleted(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
generateInitialEvent();
waitForCalls(retryEventsListener, MAX_RETRIES + 1);
String failedInsertionId = retrieveFirstFailedInsertionId();
with()
.delete(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID + "/" + failedInsertionId);
when()
.get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID + "/" + failedInsertionId)
.then()
.statusCode(HttpStatus.NOT_FOUND_404);
}
@Test
void taskShouldBeCompletedAfterSuccessfulRedelivery(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
generateInitialEvent();
waitForCalls(retryEventsListener, MAX_RETRIES + 1);
String failedInsertionId = retrieveFirstFailedInsertionId();
String taskId = with()
.queryParam("action", EVENTS_ACTION)
.post(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID + "/" + failedInsertionId)
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("additionalInformation.successfulRedeliveriesCount", is(1))
.body("additionalInformation.failedRedeliveriesCount", is(0))
.body("additionalInformation.group", is(GROUP_ID))
.body("additionalInformation.insertionId", is(failedInsertionId));
}
@Test
void failedEventShouldNotBeInDeadLettersAfterSuccessfulRedelivery(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
generateInitialEvent();
waitForCalls(retryEventsListener, MAX_RETRIES + 1);
String failedInsertionId = retrieveFirstFailedInsertionId();
String taskId = with()
.queryParam("action", EVENTS_ACTION)
.post(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID + "/" + failedInsertionId)
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
when()
.get("/events/deadLetter/groups/" + GROUP_ID + "/" + failedInsertionId)
.then()
.statusCode(HttpStatus.NOT_FOUND_404);
}
@Test
void failedEventShouldBeCorrectlyProcessedByListenerAfterSuccessfulRedelivery(RetryEventsListener retryEventsListener) throws InterruptedException {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
generateInitialEvent();
waitForCalls(retryEventsListener, MAX_RETRIES + 1);
String failedInsertionId = retrieveFirstFailedInsertionId();
String taskId = with()
.queryParam("action", EVENTS_ACTION)
.post(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID + "/" + failedInsertionId)
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
awaitAtMostTenSeconds.until(() -> retryEventsListener.getSuccessfulEvents().size() == 1);
}
private void waitForCalls(RetryEventsListener retryEventsListener, int count) {
calmlyAwait.atMost(ONE_MINUTE).until(() -> retryEventsListener.totalCalls.intValue() >= count);
}
@Test
void taskShouldBeCompletedAfterSuccessfulGroupRedelivery(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
generateInitialEvent();
generateSecondEvent();
waitForCalls(retryEventsListener, (MAX_RETRIES + 1) * 2);
String taskId = with()
.queryParam("action", EVENTS_ACTION)
.post(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("additionalInformation.successfulRedeliveriesCount", is(2))
.body("additionalInformation.failedRedeliveriesCount", is(0))
.body("additionalInformation.group", is(GROUP_ID));
}
@Test
void multipleFailedEventsShouldNotBeInDeadLettersAfterSuccessfulGroupRedelivery(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
generateInitialEvent();
generateSecondEvent();
waitForCalls(retryEventsListener, (MAX_RETRIES + 1) * 2);
String taskId = with()
.queryParam("action", EVENTS_ACTION)
.post(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
when()
.get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
.then()
.statusCode(HttpStatus.OK_200)
.contentType(ContentType.JSON)
.body(".", hasSize(0));
}
@Test
void multipleFailedEventsShouldBeCorrectlyProcessedByListenerAfterSuccessfulGroupRedelivery(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
generateInitialEvent();
generateSecondEvent();
waitForCalls(retryEventsListener, (MAX_RETRIES + 1) * 2);
String taskId = with()
.queryParam("action", EVENTS_ACTION)
.post(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
awaitAtMostTenSeconds.until(() -> retryEventsListener.getSuccessfulEvents().size() == 2);
}
@Test
void taskShouldBeCompletedAfterSuccessfulAllRedelivery(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
generateInitialEvent();
generateSecondEvent();
waitForCalls(retryEventsListener, (MAX_RETRIES + 1) * 2);
String taskId = with()
.queryParam("action", EVENTS_ACTION)
.post(EventDeadLettersRoutes.BASE_PATH)
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("additionalInformation.successfulRedeliveriesCount", is(2))
.body("additionalInformation.failedRedeliveriesCount", is(0));
}
@Test
void multipleFailedEventsShouldNotBeInDeadLettersAfterSuccessfulAllRedelivery(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
generateInitialEvent();
generateSecondEvent();
waitForCalls(retryEventsListener, (MAX_RETRIES + 1) * 2);
String taskId = with()
.queryParam("action", EVENTS_ACTION)
.post(EventDeadLettersRoutes.BASE_PATH)
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
when()
.get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID)
.then()
.statusCode(HttpStatus.OK_200)
.contentType(ContentType.JSON)
.body(".", hasSize(0));
}
@Test
void multipleFailedEventsShouldBeCorrectlyProcessedByListenerAfterSuccessfulAllRedelivery(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES + 1);
generateInitialEvent();
generateSecondEvent();
waitForCalls(retryEventsListener, (MAX_RETRIES + 1) * 2);
String taskId = with()
.queryParam("action", EVENTS_ACTION)
.post(EventDeadLettersRoutes.BASE_PATH)
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
awaitAtMostTenSeconds.until(() -> retryEventsListener.getSuccessfulEvents().size() == 2);
}
@Disabled("retry rest API delivers only once, see JAMES-2907. We need same retry cound for this test to work")
@Test
void failedEventShouldStillBeInDeadLettersAfterFailedRedelivery(RetryEventsListener retryEventsListener) {
retryEventsListener.callsPerEventBeforeSuccess(MAX_RETRIES * 2 + 1);
generateInitialEvent();
waitForCalls(retryEventsListener, MAX_RETRIES + 1);
String failedInsertionId = retrieveFirstFailedInsertionId();
String taskId = with()
.queryParam("action", EVENTS_ACTION)
.post(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID + "/" + failedInsertionId)
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
String newFailedInsertionId = retrieveFirstFailedInsertionId();
when()
.get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID + "/" + newFailedInsertionId)
.then()
.statusCode(HttpStatus.OK_200);
}
}