blob: e0bdad4abc2ea28ac0c6d2c8f6b0cc17fa864975 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.webadmin.routes;
import static io.restassured.RestAssured.given;
import static io.restassured.RestAssured.when;
import static io.restassured.RestAssured.with;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.List;
import javax.mail.Flags;
import org.apache.james.backends.opensearch.DockerElasticSearchExtension;
import org.apache.james.backends.opensearch.ElasticSearchIndexer;
import org.apache.james.backends.opensearch.ReactorElasticSearchClient;
import org.apache.james.core.Username;
import org.apache.james.json.DTOConverter;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageIdManager;
import org.apache.james.mailbox.MessageManager;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.opensearch.IndexAttachments;
import org.apache.james.mailbox.opensearch.MailboxElasticSearchConstants;
import org.apache.james.mailbox.opensearch.MailboxIdRoutingKeyFactory;
import org.apache.james.mailbox.opensearch.MailboxIndexCreationUtil;
import org.apache.james.mailbox.opensearch.events.ElasticSearchListeningMessageSearchIndex;
import org.apache.james.mailbox.opensearch.json.MessageToElasticSearchJson;
import org.apache.james.mailbox.opensearch.query.CriterionConverter;
import org.apache.james.mailbox.opensearch.query.QueryConverter;
import org.apache.james.mailbox.opensearch.search.ElasticSearchSearcher;
import org.apache.james.mailbox.indexer.ReIndexer;
import org.apache.james.mailbox.inmemory.InMemoryId;
import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
import org.apache.james.mailbox.inmemory.InMemoryMessageId;
import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
import org.apache.james.mailbox.model.ByteContent;
import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.FetchGroup;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.model.MessageResult;
import org.apache.james.mailbox.model.ThreadId;
import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.store.extractor.DefaultTextExtractor;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
import org.apache.james.task.Hostname;
import org.apache.james.task.MemoryTaskManager;
import org.apache.james.webadmin.WebAdminServer;
import org.apache.james.webadmin.WebAdminUtils;
import org.apache.james.webadmin.dto.WebAdminIndexationContextInformationDTO.WebAdminErrorRecoveryIndexationDTO;
import org.apache.james.webadmin.dto.WebAdminIndexationContextInformationDTO.WebAdminFullIndexationDTO;
import org.apache.james.webadmin.dto.WebAdminSingleMailboxReindexingTaskAdditionalInformationDTO;
import org.apache.james.webadmin.service.PreviousReIndexingService;
import org.apache.james.webadmin.utils.ErrorResponder;
import org.apache.james.webadmin.utils.JsonTransformer;
import org.apache.mailbox.tools.indexer.FullReindexingTask;
import org.apache.mailbox.tools.indexer.ReIndexerImpl;
import org.apache.mailbox.tools.indexer.ReIndexerPerformer;
import org.apache.mailbox.tools.indexer.SingleMailboxReindexingTask;
import org.apache.mailbox.tools.indexer.SingleMessageReindexingTask;
import org.apache.mailbox.tools.indexer.SingleMessageReindexingTaskAdditionalInformationDTO;
import org.eclipse.jetty.http.HttpStatus;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.restassured.RestAssured;
import reactor.core.publisher.Mono;
class MailboxesRoutesTest {
static final Username USERNAME = Username.of("benwa@apache.org");
static final MailboxPath INBOX = MailboxPath.inbox(USERNAME);
static final int BATCH_SIZE = 1;
static final int SEARCH_SIZE = 1;
@RegisterExtension
DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
WebAdminServer webAdminServer;
ListeningMessageSearchIndex searchIndex;
InMemoryMailboxManager mailboxManager;
MessageIdManager messageIdManager;
MemoryTaskManager taskManager;
ReactorElasticSearchClient client;
@BeforeEach
void beforeEach() throws Exception {
client = MailboxIndexCreationUtil.prepareDefaultClient(
elasticSearch.getDockerElasticSearch().clientProvider().get(),
elasticSearch.getDockerElasticSearch().configuration());
InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
MailboxIdRoutingKeyFactory routingKeyFactory = new MailboxIdRoutingKeyFactory();
InMemoryIntegrationResources resources = InMemoryIntegrationResources.builder()
.preProvisionnedFakeAuthenticator()
.fakeAuthorizator()
.inVmEventBus()
.defaultAnnotationLimits()
.defaultMessageParser()
.listeningSearchIndex(preInstanciationStage -> new ElasticSearchListeningMessageSearchIndex(
preInstanciationStage.getMapperFactory(),
ImmutableSet.of(),
new ElasticSearchIndexer(client,
MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS),
new ElasticSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE,
MailboxElasticSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, routingKeyFactory),
new MessageToElasticSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.YES),
preInstanciationStage.getSessionProvider(), routingKeyFactory, messageIdFactory))
.noPreDeletionHooks()
.storeQuotaManager()
.build();
mailboxManager = resources.getMailboxManager();
messageIdManager = resources.getMessageIdManager();
taskManager = new MemoryTaskManager(new Hostname("foo"));
InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory();
searchIndex = spy((ListeningMessageSearchIndex) resources.getSearchIndex());
ReIndexerPerformer reIndexerPerformer = new ReIndexerPerformer(
mailboxManager,
searchIndex,
mailboxManager.getMapperFactory());
ReIndexer reIndexer = new ReIndexerImpl(
reIndexerPerformer,
mailboxManager,
mailboxManager.getMapperFactory());
JsonTransformer jsonTransformer = new JsonTransformer();
webAdminServer = WebAdminUtils.createWebAdminServer(
new TasksRoutes(taskManager, jsonTransformer,
DTOConverter.of(
WebAdminErrorRecoveryIndexationDTO.serializationModule(),
WebAdminFullIndexationDTO.serializationModule(),
WebAdminSingleMailboxReindexingTaskAdditionalInformationDTO.serializationModule(),
SingleMessageReindexingTaskAdditionalInformationDTO.module(mailboxIdFactory))),
new MailboxesRoutes(taskManager,
jsonTransformer,
ImmutableSet.of(
new MailboxesRoutes.ReIndexAllMailboxesTaskRegistration(
reIndexer, new PreviousReIndexingService(taskManager), mailboxIdFactory)),
ImmutableSet.of(
new MailboxesRoutes.ReIndexOneMailboxTaskRegistration(
reIndexer, mailboxIdFactory)),
ImmutableSet.of(
new MailboxesRoutes.ReIndexOneMailTaskRegistration(
reIndexer, mailboxIdFactory))))
.start();
RestAssured.requestSpecification = WebAdminUtils.buildRequestSpecification(webAdminServer).build();
RestAssured.enableLoggingOfRequestAndResponseIfValidationFails();
}
@AfterEach
void tearDown() {
webAdminServer.destroy();
taskManager.stop();
}
@Nested
class FullReIndexing {
@Nested
class Validation {
@Test
void fullReIndexingShouldFailWithNoTask() {
when()
.post("/mailboxes")
.then()
.statusCode(HttpStatus.BAD_REQUEST_400)
.body("statusCode", is(400))
.body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
.body("message", is("Invalid arguments supplied in the user request"))
.body("details", is("'task' query parameter is compulsory. Supported values are [reIndex]"));
}
@Test
void fullReIndexingShouldFailWithBadTask() {
when()
.post("/mailboxes?task=bad")
.then()
.statusCode(HttpStatus.BAD_REQUEST_400)
.body("statusCode", is(400))
.body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
.body("message", is("Invalid arguments supplied in the user request"))
.body("details", is("Invalid value supplied for query parameter 'task': bad. Supported values are [reIndex]"));
}
}
@Nested
class TaskDetails {
@Test
void fullReIndexingShouldNotFailWhenNoMail() {
String taskId = with()
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is(FullReindexingTask.FULL_RE_INDEXING.asString()))
.body("additionalInformation.successfullyReprocessedMailCount", is(0))
.body("additionalInformation.failedReprocessedMailCount", is(0))
.body("additionalInformation.runningOptions.messagesPerSecond", is(50))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
}
@Test
void fullReIndexingShouldReturnTaskDetailsWhenMail() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
mailboxManager.createMailbox(INBOX, systemSession).get();
mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession);
String taskId = with()
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is(FullReindexingTask.FULL_RE_INDEXING.asString()))
.body("additionalInformation.successfullyReprocessedMailCount", is(1))
.body("additionalInformation.failedReprocessedMailCount", is(0))
.body("additionalInformation.runningOptions.messagesPerSecond", is(50))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
}
@Test
void fullReIndexingWithMessagesPerSecondShouldReturnTaskDetailsWhenMail() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
mailboxManager.createMailbox(INBOX, systemSession).get();
mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession);
String taskId = with()
.queryParam("messagesPerSecond", 1)
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is(FullReindexingTask.FULL_RE_INDEXING.asString()))
.body("additionalInformation.successfullyReprocessedMailCount", is(1))
.body("additionalInformation.failedReprocessedMailCount", is(0))
.body("additionalInformation.runningOptions.messagesPerSecond", is(1))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
}
@Test
void fullReIndexingShouldReturnTaskDetailsWhenFailing() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
ComposedMessageId composedMessageId = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession).getId();
doReturn(Mono.error(new RuntimeException()))
.when(searchIndex)
.add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
String taskId = with()
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
long uidAsLong = composedMessageId.getUid().asLong();
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("failed"))
.body("taskId", is(notNullValue()))
.body("type", is(FullReindexingTask.FULL_RE_INDEXING.asString()))
.body("additionalInformation.successfullyReprocessedMailCount", is(0))
.body("additionalInformation.failedReprocessedMailCount", is(1))
.body("additionalInformation.runningOptions.messagesPerSecond", is(50))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
.body("additionalInformation.messageFailures.\"" + mailboxId.serialize() + "\"[0].uid", is(Long.valueOf(uidAsLong).intValue()))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()));
}
@Test
void userReIndexingShouldReturnTaskDetailsWhenFailingAtTheMailboxLevel() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
doReturn(Mono.error(new RuntimeException()))
.when(searchIndex)
.deleteAll(any(MailboxSession.class), any(MailboxId.class));
String taskId = with()
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", Matchers.is("failed"))
.body("taskId", Matchers.is(notNullValue()))
.body("additionalInformation.mailboxFailures", Matchers.containsInAnyOrder(mailboxId.serialize()));
}
@Test
void fullReIndexingWithCorrectModeShouldReturnTaskDetailsWhenMails() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession)
.getId();
mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession);
List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
Flags newFlags = new Flags(Flags.Flag.DRAFT);
UpdatedFlags updatedFlags = UpdatedFlags.builder()
.uid(result.getUid())
.modSeq(messages.get(0).getModSeq())
.oldFlags(new Flags())
.newFlags(newFlags)
.build();
// We update on the searchIndex level to try to create inconsistencies
searchIndex.update(systemSession, mailbox.getMailboxId(), ImmutableList.of(updatedFlags)).block();
String taskId = with()
.post("/mailboxes?task=reIndex&mode=fixOutdated")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is(FullReindexingTask.FULL_RE_INDEXING.asString()))
.body("additionalInformation.successfullyReprocessedMailCount", is(2))
.body("additionalInformation.failedReprocessedMailCount", is(0))
.body("additionalInformation.runningOptions.messagesPerSecond", is(50))
.body("additionalInformation.runningOptions.mode", is("FIX_OUTDATED"))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
}
@Test
void fullReIndexingShouldAcceptRebuildAllNoCleanupMode() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession)
.getId();
mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession);
List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
Flags newFlags = new Flags(Flags.Flag.DRAFT);
UpdatedFlags updatedFlags = UpdatedFlags.builder()
.uid(result.getUid())
.modSeq(messages.get(0).getModSeq())
.oldFlags(new Flags())
.newFlags(newFlags)
.build();
// We update on the searchIndex level to try to create inconsistencies
searchIndex.update(systemSession, mailbox.getMailboxId(), ImmutableList.of(updatedFlags)).block();
String taskId = with()
.post("/mailboxes?task=reIndex&mode=rebuildAllNoCleanup")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is(FullReindexingTask.FULL_RE_INDEXING.asString()))
.body("additionalInformation.successfullyReprocessedMailCount", is(2))
.body("additionalInformation.failedReprocessedMailCount", is(0))
.body("additionalInformation.runningOptions.messagesPerSecond", is(50))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL_NO_CLEANUP"))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
// verify that deleteAll on index never got called with rebuildAllNoCleanup mode
verify(searchIndex, never()).deleteAll(any(MailboxSession.class), any(MailboxId.class));
}
@Test
void fullReIndexingWithCorrectModeShouldFixInconsistenciesInES() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession)
.getId();
Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
Flags newFlags = new Flags(Flags.Flag.DRAFT);
UpdatedFlags updatedFlags = UpdatedFlags.builder()
.uid(result.getUid())
.modSeq(messages.get(0).getModSeq())
.oldFlags(new Flags())
.newFlags(newFlags)
.build();
// We update on the searchIndex level to try to create inconsistencies
searchIndex.update(systemSession, mailbox.getMailboxId(), ImmutableList.of(updatedFlags)).block();
String taskId = with()
.post("/mailboxes?task=reIndex&mode=fixOutdated")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await");
assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
.isEqualTo(initialFlags);
}
@Test
void fullReIndexingNoCleanupShouldNoopWhenNoInconsistencies() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession)
.getId();
Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
String taskId = with()
.post("/mailboxes?task=reIndex&mode=rebuildAllNoCleanup")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await");
assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
.isEqualTo(initialFlags);
}
@Test
void fullReIndexingNoCleanupShouldSolveInconsistencies() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession)
.getId();
Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
// We update on the searchIndex level to try to create inconsistencies
searchIndex.delete(systemSession, mailbox.getMailboxId(),
messages.stream()
.map(MessageResult::getUid)
.collect(ImmutableList.toImmutableList()))
.block();
String taskId = with()
.post("/mailboxes?task=reIndex&mode=rebuildAllNoCleanup")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await");
assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
.isEqualTo(initialFlags);
}
@Test
void fullReIndexingWithCorrectModeShouldNotChangeDocumentsInESWhenNoInconsistencies() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession)
.getId();
Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
String taskId = with()
.post("/mailboxes?task=reIndex&mode=fixOutdated")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await");
assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
.isEqualTo(initialFlags);
}
@Disabled("JAMES-3202 Limitation of the current correct mode reindexation. We only check metadata and fix "
+ "inconsistencies with ES, but we don't check for inconsistencies from ES to metadata")
@Test
void fullReIndexingWithCorrectModeShouldRemoveOrphanMessagesInES() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
byte[] content = "Simple message content".getBytes(StandardCharsets.UTF_8);
MessageUid uid = MessageUid.of(22L);
SimpleMailboxMessage message = SimpleMailboxMessage.builder()
.messageId(InMemoryMessageId.of(42L))
.threadId(ThreadId.fromBaseMessageId(InMemoryMessageId.of(42L)))
.uid(uid)
.content(new ByteContent(content))
.size(content.length)
.internalDate(new Date(ZonedDateTime.parse("2018-02-15T15:54:02Z").toEpochSecond()))
.bodyStartOctet(0)
.flags(new Flags("myFlags"))
.properties(new PropertyBuilder())
.mailboxId(mailboxId)
.build();
searchIndex.add(systemSession, mailbox, message).block();
String taskId = with()
.post("/mailboxes?task=reIndex&mode=fixOutdated")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await");
assertThat(searchIndex.retrieveIndexedFlags(mailbox, uid).blockOptional())
.isEmpty();
}
}
@Nested
class SideEffects {
@Test
void fullReIndexingShouldPerformReIndexingWhenMail() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
ComposedMessageId createdMessage = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession).getId();
String taskId = with()
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await")
.then()
.body("status", is("completed"));
ArgumentCaptor<MailboxMessage> messageCaptor = ArgumentCaptor.forClass(MailboxMessage.class);
ArgumentCaptor<MailboxId> mailboxIdCaptor = ArgumentCaptor.forClass(MailboxId.class);
ArgumentCaptor<Mailbox> mailboxCaptor2 = ArgumentCaptor.forClass(Mailbox.class);
verify(searchIndex).deleteAll(any(MailboxSession.class), mailboxIdCaptor.capture());
verify(searchIndex).add(any(MailboxSession.class), mailboxCaptor2.capture(), messageCaptor.capture());
verifyNoMoreInteractions(searchIndex);
assertThat(mailboxIdCaptor.getValue()).matches(capturedMailboxId -> capturedMailboxId.equals(mailboxId));
assertThat(mailboxCaptor2.getValue()).matches(mailbox -> mailbox.getMailboxId().equals(mailboxId));
assertThat(messageCaptor.getValue()).matches(message -> message.getMailboxId().equals(mailboxId)
&& message.getUid().equals(createdMessage.getUid()));
}
}
}
@Nested
class MailboxReIndexing {
@Nested
class Validation {
@Test
void mailboxReIndexingShouldFailWithNoTask() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
when()
.post("/mailboxes/" + mailboxId.serialize())
.then()
.statusCode(HttpStatus.BAD_REQUEST_400)
.body("statusCode", is(400))
.body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
.body("message", is("Invalid arguments supplied in the user request"))
.body("details", is("'task' query parameter is compulsory. Supported values are [reIndex]"));
}
@Test
void mailboxReIndexingShouldFailWithBadTask() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
when()
.post("/mailboxes/" + mailboxId.serialize() + "?task=bad")
.then()
.statusCode(HttpStatus.BAD_REQUEST_400)
.body("statusCode", is(400))
.body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
.body("message", is("Invalid arguments supplied in the user request"))
.body("details", is("Invalid value supplied for query parameter 'task': bad. Supported values are [reIndex]"));
}
@Test
void mailboxReIndexingShouldFailWithBadMailboxId() {
when()
.post("/mailboxes/bad?task=reIndex")
.then()
.statusCode(HttpStatus.BAD_REQUEST_400)
.body("statusCode", is(400))
.body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
.body("message", is("Error while parsing 'mailbox'"));
}
@Test
void mailboxReIndexingShouldFailWithNonExistentMailboxId() {
when()
.post("/mailboxes/36?task=reIndex")
.then()
.statusCode(HttpStatus.NOT_FOUND_404)
.body("statusCode", is(404))
.body("type", is(ErrorResponder.ErrorType.NOT_FOUND.getType()))
.body("message", is("mailbox not found"));
}
}
@Nested
class TaskDetails {
@Test
void mailboxReIndexingShouldNotFailWhenNoMail() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
String taskId = when()
.post("/mailboxes/" + mailboxId.serialize() + "?task=reIndex")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is(SingleMailboxReindexingTask.TYPE.asString()))
.body("additionalInformation.mailboxId", is(mailboxId.serialize()))
.body("additionalInformation.successfullyReprocessedMailCount", is(0))
.body("additionalInformation.failedReprocessedMailCount", is(0))
.body("additionalInformation.runningOptions.messagesPerSecond", is(50))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
}
@Test
void mailboxReIndexingShouldReturnTaskDetailsWhenMail() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession);
String taskId = when()
.post("/mailboxes/" + mailboxId.serialize() + "?task=reIndex")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is(SingleMailboxReindexingTask.TYPE.asString()))
.body("additionalInformation.mailboxId", is(mailboxId.serialize()))
.body("additionalInformation.successfullyReprocessedMailCount", is(1))
.body("additionalInformation.failedReprocessedMailCount", is(0))
.body("additionalInformation.runningOptions.messagesPerSecond", is(50))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()));
}
@Test
void mailboxReIndexingWithMessagesPerSecondShouldReturnTaskDetailsWhenMail() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession);
String taskId = with()
.queryParam("task", "reIndex")
.queryParam("messagesPerSecond", 1)
.post("/mailboxes/" + mailboxId.serialize())
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is(SingleMailboxReindexingTask.TYPE.asString()))
.body("additionalInformation.mailboxId", is(mailboxId.serialize()))
.body("additionalInformation.successfullyReprocessedMailCount", is(1))
.body("additionalInformation.failedReprocessedMailCount", is(0))
.body("additionalInformation.runningOptions.messagesPerSecond", is(1))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()));
}
@Test
void mailboxReIndexingShouldReturnTaskDetailsWhenFailing() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
ComposedMessageId composedMessageId = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession).getId();
doReturn(Mono.error(new RuntimeException()))
.when(searchIndex)
.add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
String taskId = with()
.queryParam("task", "reIndex")
.post("/mailboxes/" + mailboxId.serialize())
.jsonPath()
.get("taskId");
long uidAsLong = composedMessageId.getUid().asLong();
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("failed"))
.body("taskId", is(notNullValue()))
.body("type", is(SingleMailboxReindexingTask.TYPE.asString()))
.body("additionalInformation.successfullyReprocessedMailCount", is(0))
.body("additionalInformation.failedReprocessedMailCount", is(1))
.body("additionalInformation.runningOptions.messagesPerSecond", is(50))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
.body("additionalInformation.messageFailures.\"" + mailboxId.serialize() + "\"[0].uid", is(Long.valueOf(uidAsLong).intValue()))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()));
}
@Test
void userReIndexingShouldReturnTaskDetailsWhenFailingAtTheMailboxLevel() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
doReturn(Mono.error(new RuntimeException()))
.when(searchIndex)
.deleteAll(any(MailboxSession.class), any(MailboxId.class));
String taskId = with()
.queryParam("task", "reIndex")
.post("/mailboxes/" + mailboxId.serialize())
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", Matchers.is("failed"))
.body("taskId", Matchers.is(notNullValue()))
.body("additionalInformation.mailboxFailures", Matchers.containsInAnyOrder(mailboxId.serialize()));
}
@Test
void mailboxReIndexingWithCorrectModeShouldReturnTaskDetailsWhenMails() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession)
.getId();
mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession);
List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
Flags newFlags = new Flags(Flags.Flag.DRAFT);
UpdatedFlags updatedFlags = UpdatedFlags.builder()
.uid(result.getUid())
.modSeq(messages.get(0).getModSeq())
.oldFlags(new Flags())
.newFlags(newFlags)
.build();
// We update on the searchIndex level to try to create inconsistencies
searchIndex.update(systemSession, mailbox.getMailboxId(), ImmutableList.of(updatedFlags)).block();
String taskId = with()
.queryParam("task", "reIndex")
.queryParam("mode", "fixOutdated")
.post("/mailboxes/" + mailboxId.serialize())
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is(SingleMailboxReindexingTask.TYPE.asString()))
.body("additionalInformation.successfullyReprocessedMailCount", is(2))
.body("additionalInformation.failedReprocessedMailCount", is(0))
.body("additionalInformation.runningOptions.messagesPerSecond", is(50))
.body("additionalInformation.runningOptions.mode", is("FIX_OUTDATED"))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
}
@Test
void mailboxReIndexingWithCorrectModeShouldFixInconsistenciesInES() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession)
.getId();
Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
List<MessageResult> messages = messageIdManager.getMessages(ImmutableList.of(result.getMessageId()), FetchGroup.MINIMAL, systemSession);
Flags newFlags = new Flags(Flags.Flag.DRAFT);
UpdatedFlags updatedFlags = UpdatedFlags.builder()
.uid(result.getUid())
.modSeq(messages.get(0).getModSeq())
.oldFlags(new Flags())
.newFlags(newFlags)
.build();
// We update on the searchIndex level to try to create inconsistencies
searchIndex.update(systemSession, mailbox.getMailboxId(), ImmutableList.of(updatedFlags)).block();
String taskId = with()
.queryParam("task", "reIndex")
.queryParam("mode", "fixOutdated")
.post("/mailboxes/" + mailboxId.serialize())
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await");
assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
.isEqualTo(initialFlags);
}
@Test
void mailboxReIndexingWithCorrectModeShouldNotChangeDocumentsInESWhenNoInconsistencies() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
ComposedMessageId result = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession)
.getId();
Flags initialFlags = searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block();
String taskId = with()
.queryParam("task", "reIndex")
.queryParam("mode", "fixOutdated")
.post("/mailboxes/" + mailboxId.serialize())
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await");
assertThat(searchIndex.retrieveIndexedFlags(mailbox, result.getUid()).block())
.isEqualTo(initialFlags);
}
@Disabled("JAMES-3202 Limitation of the current correct mode reindexation. We only check metadata and fix "
+ "inconsistencies with ES, but we don't check for inconsistencies from ES to metadata")
@Test
void mailboxReIndexingWithCorrectModeShouldRemoveOrphanMessagesInES() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
Mailbox mailbox = mailboxManager.getMailbox(mailboxId, systemSession).getMailboxEntity();
byte[] content = "Simple message content".getBytes(StandardCharsets.UTF_8);
MessageUid uid = MessageUid.of(22L);
SimpleMailboxMessage message = SimpleMailboxMessage.builder()
.messageId(InMemoryMessageId.of(42L))
.threadId(ThreadId.fromBaseMessageId(InMemoryMessageId.of(42L)))
.uid(uid)
.content(new ByteContent(content))
.size(content.length)
.internalDate(new Date(ZonedDateTime.parse("2018-02-15T15:54:02Z").toEpochSecond()))
.bodyStartOctet(0)
.flags(new Flags("myFlags"))
.properties(new PropertyBuilder())
.mailboxId(mailboxId)
.build();
searchIndex.add(systemSession, mailbox, message).block();
String taskId = with()
.queryParam("task", "reIndex")
.queryParam("mode", "fixOutdated")
.post("/mailboxes/" + mailboxId.serialize())
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await");
assertThat(searchIndex.retrieveIndexedFlags(mailbox, uid).blockOptional())
.isEmpty();
}
}
@Nested
class SideEffects {
@Test
void mailboxReIndexingShouldPerformReIndexingWhenMail() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
ComposedMessageId createdMessage = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession).getId();
String taskId = when()
.post("/mailboxes/" + mailboxId.serialize() + "?task=reIndex")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"));
ArgumentCaptor<MailboxMessage> messageCaptor = ArgumentCaptor.forClass(MailboxMessage.class);
ArgumentCaptor<MailboxId> mailboxIdCaptor = ArgumentCaptor.forClass(MailboxId.class);
ArgumentCaptor<Mailbox> mailboxCaptor2 = ArgumentCaptor.forClass(Mailbox.class);
verify(searchIndex).deleteAll(any(MailboxSession.class), mailboxIdCaptor.capture());
verify(searchIndex).add(any(MailboxSession.class), mailboxCaptor2.capture(), messageCaptor.capture());
verifyNoMoreInteractions(searchIndex);
assertThat(mailboxIdCaptor.getValue()).matches(capturedMailboxId -> capturedMailboxId.equals(mailboxId));
assertThat(mailboxCaptor2.getValue()).matches(mailbox -> mailbox.getMailboxId().equals(mailboxId));
assertThat(messageCaptor.getValue()).matches(message -> message.getMailboxId().equals(mailboxId)
&& message.getUid().equals(createdMessage.getUid()));
}
}
}
@Nested
class MessageReIndexing {
@Nested
class Validation {
@Test
void messageReIndexingShouldFailWithNoTask() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
when()
.post("/mailboxes/" + mailboxId.serialize() + "/mails/7")
.then()
.statusCode(HttpStatus.BAD_REQUEST_400)
.body("statusCode", is(400))
.body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
.body("message", is("Invalid arguments supplied in the user request"))
.body("details", is("'task' query parameter is compulsory. Supported values are [reIndex]"));
}
@Test
void messageReIndexingShouldFailWithBadTask() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
when()
.post("/mailboxes/" + mailboxId.serialize() + "/mails/7?task=bad")
.then()
.statusCode(HttpStatus.BAD_REQUEST_400)
.body("statusCode", is(400))
.body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
.body("message", is("Invalid arguments supplied in the user request"))
.body("details", is("Invalid value supplied for query parameter 'task': bad. Supported values are [reIndex]"));
}
@Test
void messageReIndexingShouldFailWithBadMailboxId() {
when()
.post("/mailboxes/bad/mails/7?task=reIndex")
.then()
.statusCode(HttpStatus.BAD_REQUEST_400)
.body("statusCode", is(400))
.body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
.body("message", is("Error while parsing 'mailbox'"));
}
@Test
void messageReIndexingShouldFailWithNonExistentMailboxId() {
when()
.post("/mailboxes/36/mails/7?task=reIndex")
.then()
.statusCode(HttpStatus.NOT_FOUND_404)
.body("statusCode", is(404))
.body("type", is(ErrorResponder.ErrorType.NOT_FOUND.getType()))
.body("message", is("mailbox not found"));
}
@Test
void messageReIndexingShouldFailWithBadUid() {
when()
.post("/mailboxes/36/mails/bad?task=reIndex")
.then()
.statusCode(HttpStatus.BAD_REQUEST_400)
.body("statusCode", is(400))
.body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
.body("message", is("'uid' needs to be a parsable long"));
}
}
@Nested
class TaskDetails {
@Test
void messageReIndexingShouldNotFailWhenUidNotFound() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
String taskId = when()
.post("/mailboxes/" + mailboxId.serialize() + "/mails/1?task=reIndex")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is(SingleMessageReindexingTask.MESSAGE_RE_INDEXING.asString()))
.body("additionalInformation.mailboxId", is(mailboxId.serialize()))
.body("additionalInformation.uid", is(1))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
}
@Test
void messageReIndexingShouldReturnTaskDetailsWhenMail() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
ComposedMessageId composedMessageId = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession).getId();
String taskId = when()
.post("/mailboxes/" + mailboxId.serialize() + "/mails/"
+ composedMessageId.getUid().asLong() + "?task=reIndex")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is(SingleMessageReindexingTask.MESSAGE_RE_INDEXING.asString()))
.body("additionalInformation.mailboxId", is(mailboxId.serialize()))
.body("additionalInformation.uid", is((int) composedMessageId.getUid().asLong()))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
}
}
@Nested
class SideEffects {
@Test
void mailboxReIndexingShouldPerformReIndexingWhenMail() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
ComposedMessageId createdMessage = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession).getId();
String taskId = when()
.post("/mailboxes/" + mailboxId.serialize() + "/mails/"
+ createdMessage.getUid().asLong() + "?task=reIndex")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is("completed"));
ArgumentCaptor<MailboxMessage> messageCaptor = ArgumentCaptor.forClass(MailboxMessage.class);
ArgumentCaptor<Mailbox> mailboxCaptor = ArgumentCaptor.forClass(Mailbox.class);
verify(searchIndex).add(any(MailboxSession.class), mailboxCaptor.capture(), messageCaptor.capture());
verifyNoMoreInteractions(searchIndex);
assertThat(mailboxCaptor.getValue()).matches(mailbox -> mailbox.getMailboxId().equals(mailboxId));
assertThat(messageCaptor.getValue()).matches(message -> message.getMailboxId().equals(mailboxId)
&& message.getUid().equals(createdMessage.getUid()));
}
}
}
@Nested
class FixingReIndexing {
@Nested
class Validation {
@Test
void fixingReIndexingShouldThrowOnMissingTaskQueryParameter() {
String taskId = with()
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
given()
.queryParam("reIndexFailedMessagesOf", taskId)
.when()
.post("/mailboxes")
.then()
.statusCode(HttpStatus.BAD_REQUEST_400)
.body("statusCode", is(400))
.body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
.body("message", is("Invalid arguments supplied in the user request"))
.body("details", is("'task' query parameter is compulsory. Supported values are [reIndex]"));
}
@Test
void fixingReIndexingShouldFailWithBadTask() {
String taskId = with()
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
given()
.queryParam("reIndexFailedMessagesOf", taskId)
.when()
.post("/mailboxes?task=bad")
.then()
.statusCode(HttpStatus.BAD_REQUEST_400)
.body("statusCode", is(400))
.body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
.body("message", is("Invalid arguments supplied in the user request"))
.body("details", is("Invalid value supplied for query parameter 'task': bad. Supported values are [reIndex]"));
}
@Test
void fixingReIndexingShouldRejectNotExistingTask() {
String taskId = "bbdb69c9-082a-44b0-a85a-6e33e74287a5";
given()
.queryParam("reIndexFailedMessagesOf", taskId)
.when()
.post("/mailboxes?task=bad")
.then()
.statusCode(HttpStatus.BAD_REQUEST_400)
.body("statusCode", is(400))
.body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
.body("message", is("Invalid arguments supplied in the user request"))
.body("details", is("Invalid value supplied for query parameter 'task': bad. Supported values are [reIndex]"));
}
}
@Nested
class TaskDetails {
@Test
void fixingReIndexingShouldNotFailWhenNoMail() {
String taskId = with()
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
String fixingTaskId = with()
.queryParam("reIndexFailedMessagesOf", taskId)
.queryParam("task", "reIndex")
.post("/mailboxes")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(fixingTaskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is("error-recovery-indexation"))
.body("additionalInformation.successfullyReprocessedMailCount", is(0))
.body("additionalInformation.failedReprocessedMailCount", is(0))
.body("additionalInformation.runningOptions.messagesPerSecond", is(50))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
}
@Test
void fixingReIndexingShouldReturnTaskDetailsWhenMail() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
mailboxManager.createMailbox(INBOX, systemSession).get();
mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession);
doReturn(Mono.error(new RuntimeException())).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
String taskId = with()
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
doReturn(Mono.empty()).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
String fixingTaskId = with()
.queryParam("reIndexFailedMessagesOf", taskId)
.queryParam("task", "reIndex")
.post("/mailboxes")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(fixingTaskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is("error-recovery-indexation"))
.body("additionalInformation.successfullyReprocessedMailCount", is(1))
.body("additionalInformation.failedReprocessedMailCount", is(0))
.body("additionalInformation.runningOptions.messagesPerSecond", is(50))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
}
@Test
void fixingReIndexingWithMessagePerSecondShouldReturnTaskDetailsWhenMail() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
mailboxManager.createMailbox(INBOX, systemSession).get();
mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession);
doReturn(Mono.error(new RuntimeException())).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
String taskId = with()
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
doReturn(Mono.empty()).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
String fixingTaskId = with()
.queryParam("reIndexFailedMessagesOf", taskId)
.queryParam("task", "reIndex")
.queryParam("messagesPerSecond", 1)
.post("/mailboxes")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(fixingTaskId + "/await")
.then()
.body("status", is("completed"))
.body("taskId", is(notNullValue()))
.body("type", is("error-recovery-indexation"))
.body("additionalInformation.successfullyReprocessedMailCount", is(1))
.body("additionalInformation.failedReprocessedMailCount", is(0))
.body("additionalInformation.runningOptions.messagesPerSecond", is(1))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()))
.body("completedDate", is(notNullValue()));
}
@Test
void fixingReIndexingShouldReturnTaskDetailsWhenFailing() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
ComposedMessageId composedMessageId = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession).getId();
doReturn(Mono.error(new RuntimeException()))
.when(searchIndex)
.add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
String taskId = with()
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
String fixingTaskId = with()
.queryParam("reIndexFailedMessagesOf", taskId)
.queryParam("task", "reIndex")
.post("/mailboxes")
.jsonPath()
.get("taskId");
long uidAsLong = composedMessageId.getUid().asLong();
given()
.basePath(TasksRoutes.BASE)
.when()
.get(fixingTaskId + "/await")
.then()
.body("status", is("failed"))
.body("taskId", is(notNullValue()))
.body("type", is("error-recovery-indexation"))
.body("additionalInformation.successfullyReprocessedMailCount", is(0))
.body("additionalInformation.failedReprocessedMailCount", is(1))
.body("additionalInformation.runningOptions.messagesPerSecond", is(50))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
.body("additionalInformation.messageFailures.\"" + mailboxId.serialize() + "\"[0].uid", is(Long.valueOf(uidAsLong).intValue()))
.body("startedDate", is(notNullValue()))
.body("submitDate", is(notNullValue()));
}
@Test
void userReIndexingShouldReturnTaskDetailsWhenFailingAtTheMailboxLevel() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
doReturn(Mono.error(new RuntimeException()))
.when(searchIndex)
.deleteAll(any(MailboxSession.class), any(MailboxId.class));
String taskId = with()
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
String fixingTaskId = with()
.queryParam("reIndexFailedMessagesOf", taskId)
.queryParam("task", "reIndex")
.post("/mailboxes")
.jsonPath()
.get("taskId");
given()
.basePath(TasksRoutes.BASE)
.when()
.get(fixingTaskId + "/await")
.then()
.body("status", Matchers.is("failed"))
.body("taskId", Matchers.is(notNullValue()))
.body("additionalInformation.runningOptions.messagesPerSecond", is(50))
.body("additionalInformation.runningOptions.mode", is("REBUILD_ALL"))
.body("additionalInformation.mailboxFailures", Matchers.containsInAnyOrder(mailboxId.serialize()));
}
}
@Nested
class SideEffects {
@Test
void fixingReIndexingShouldPerformReIndexingWhenMail() throws Exception {
MailboxSession systemSession = mailboxManager.createSystemSession(USERNAME);
MailboxId mailboxId = mailboxManager.createMailbox(INBOX, systemSession).get();
ComposedMessageId createdMessage = mailboxManager.getMailbox(INBOX, systemSession)
.appendMessage(
MessageManager.AppendCommand.builder().build("header: value\r\n\r\nbody"),
systemSession).getId();
doReturn(Mono.error(new RuntimeException())).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
String taskId = with()
.post("/mailboxes?task=reIndex")
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
reset(searchIndex);
String fixingTaskId = with()
.queryParam("reIndexFailedMessagesOf", taskId)
.queryParam("task", "reIndex")
.post("/mailboxes")
.jsonPath()
.get("taskId");
with()
.basePath(TasksRoutes.BASE)
.get(fixingTaskId + "/await")
.then()
.body("status", is("completed"));
ArgumentCaptor<MailboxMessage> messageCaptor = ArgumentCaptor.forClass(MailboxMessage.class);
ArgumentCaptor<Mailbox> mailboxCaptor = ArgumentCaptor.forClass(Mailbox.class);
verify(searchIndex).add(any(MailboxSession.class), mailboxCaptor.capture(), messageCaptor.capture());
verifyNoMoreInteractions(searchIndex);
assertThat(mailboxCaptor.getValue()).matches(mailbox -> mailbox.getMailboxId().equals(mailboxId));
assertThat(messageCaptor.getValue()).matches(message -> message.getMailboxId().equals(mailboxId)
&& message.getUid().equals(createdMessage.getUid()));
}
}
}
}