blob: 0cc6396c937f71249f0cb47871561aa2fa113eb5 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.webadmin.integration.rabbitmq;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;
import static io.restassured.RestAssured.given;
import static io.restassured.RestAssured.with;
import static io.restassured.config.EncoderConfig.encoderConfig;
import static io.restassured.config.RestAssuredConfig.newConfig;
import static org.apache.james.backends.rabbitmq.RabbitMQFixture.calmlyAwait;
import static org.apache.james.jmap.JmapRFCCommonRequests.UserCredential;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import org.apache.james.CassandraExtension;
import org.apache.james.CassandraRabbitMQJamesConfiguration;
import org.apache.james.CassandraRabbitMQJamesServerMain;
import org.apache.james.DockerOpenSearchExtension;
import org.apache.james.GuiceJamesServer;
import org.apache.james.JamesServerBuilder;
import org.apache.james.JamesServerExtension;
import org.apache.james.SearchConfiguration;
import org.apache.james.backends.cassandra.init.ClusterFactory;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.init.configuration.ClusterConfiguration;
import org.apache.james.core.Username;
import org.apache.james.jmap.JmapGuiceProbe;
import org.apache.james.jmap.JmapRFCCommonRequests;
import org.apache.james.junit.categories.BasicFeature;
import org.apache.james.junit.categories.Unstable;
import org.apache.james.mailbox.MessageManager.AppendCommand;
import org.apache.james.mailbox.cassandra.mail.task.MailboxMergingTask;
import org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV3Table;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.model.MailboxConstants;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.probe.ACLProbe;
import org.apache.james.mime4j.dom.Message;
import org.apache.james.modules.ACLProbeImpl;
import org.apache.james.modules.AwsS3BlobStoreExtension;
import org.apache.james.modules.MailboxProbeImpl;
import org.apache.james.modules.RabbitMQExtension;
import org.apache.james.modules.TestJMAPServerModule;
import org.apache.james.modules.blobstore.BlobStoreConfiguration;
import org.apache.james.server.CassandraProbe;
import org.apache.james.task.TaskManager;
import org.apache.james.util.Port;
import org.apache.james.utils.DataProbeImpl;
import org.apache.james.utils.WebAdminGuiceProbe;
import org.apache.james.webadmin.WebAdminUtils;
import org.apache.james.webadmin.routes.CassandraMailboxMergingRoutes;
import org.apache.james.webadmin.routes.TasksRoutes;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import com.datastax.oss.driver.api.core.CqlSession;
import io.restassured.RestAssured;
import io.restassured.builder.RequestSpecBuilder;
import io.restassured.http.ContentType;
import io.restassured.specification.RequestSpecification;
@Tag(BasicFeature.TAG)
class FixingGhostMailboxTest {
private static final String DOMAIN = "domain.tld";
private static final String CEDRIC = "cedric@" + DOMAIN;
private static final String BOB = "bob@" + DOMAIN;
private static final String ALICE = "alice@" + DOMAIN;
private static final String ALICE_SECRET = "aliceSecret";
private static final String BOB_SECRET = "bobSecret";
private static final Duration THIRTY_SECONDS = Duration.ofSeconds(30);
@RegisterExtension
static JamesServerExtension testExtension = new JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir ->
CassandraRabbitMQJamesConfiguration.builder()
.workingDirectory(tmpDir)
.configurationFromClasspath()
.blobStore(BlobStoreConfiguration.builder()
.s3()
.disableCache()
.deduplication()
.noCryptoConfig())
.searchConfiguration(SearchConfiguration.openSearch())
.build())
.extension(new DockerOpenSearchExtension())
.extension(new CassandraExtension())
.extension(new AwsS3BlobStoreExtension())
.extension(new RabbitMQExtension())
.server(configuration -> CassandraRabbitMQJamesServerMain.createServer(configuration)
.overrideWith(new TestJMAPServerModule())
.overrideWith(binder -> binder.bind(CassandraConfiguration.class)
.toInstance(CassandraConfiguration.builder()
.mailboxReadRepair(0)
.mailboxCountersReadRepairMax(0)
.mailboxCountersReadRepairChanceOneHundred(0)
.build())))
.build();
private UserCredential aliceCredential;
private MailboxProbeImpl mailboxProbe;
private ACLProbe aclProbe;
private ComposedMessageId message1;
private MailboxId aliceGhostInboxId;
private MailboxPath aliceInboxPath;
private ComposedMessageId message2;
private RequestSpecification webadminSpecification;
@BeforeEach
void setup(GuiceJamesServer server) throws Throwable {
WebAdminGuiceProbe webAdminProbe = server.getProbe(WebAdminGuiceProbe.class);
mailboxProbe = server.getProbe(MailboxProbeImpl.class);
aclProbe = server.getProbe(ACLProbeImpl.class);
Port jmapPort = server.getProbe(JmapGuiceProbe.class).getJmapPort();
RestAssured.requestSpecification = new RequestSpecBuilder()
.setContentType(ContentType.JSON)
.setAccept(ContentType.JSON)
.setConfig(newConfig().encoderConfig(encoderConfig().defaultContentCharset(StandardCharsets.UTF_8)))
.setPort(jmapPort.getValue())
.addHeader("accept", "application/json; jmapVersion=rfc-8621")
.build();
RestAssured.enableLoggingOfRequestAndResponseIfValidationFails();
webadminSpecification = WebAdminUtils.buildRequestSpecification(webAdminProbe.getWebAdminPort())
.build();
server.getProbe(DataProbeImpl.class).fluent()
.addDomain(DOMAIN)
.addUser(ALICE, ALICE_SECRET)
.addUser(BOB, BOB_SECRET);
aliceCredential = JmapRFCCommonRequests.getUserCredential(Username.of(ALICE), ALICE_SECRET);
CassandraProbe probe = server.getProbe(CassandraProbe.class);
ClusterConfiguration cassandraConfiguration = probe.getConfiguration();
try (CqlSession session = ClusterFactory.create(cassandraConfiguration, probe.getMainKeyspaceConfiguration())) {
simulateGhostMailboxBug(session);
}
}
private void simulateGhostMailboxBug(CqlSession session) throws MailboxException, IOException {
// State before ghost mailbox bug
// Alice INBOX is delegated to Bob and contains one message
aliceInboxPath = MailboxPath.inbox(Username.of(ALICE));
aliceGhostInboxId = mailboxProbe.createMailbox(MailboxConstants.USER_NAMESPACE, ALICE, MailboxConstants.INBOX);
aclProbe.addRights(aliceInboxPath, BOB, MailboxACL.FULL_RIGHTS);
message1 = mailboxProbe.appendMessage(ALICE, aliceInboxPath, AppendCommand.from(generateMessageContent()));
testExtension.await();
// Simulate ghost mailbox bug
session.execute(deleteFrom(CassandraMailboxPathV3Table.TABLE_NAME)
.where(column(CassandraMailboxPathV3Table.NAMESPACE).isEqualTo(literal(MailboxConstants.USER_NAMESPACE)),
column(CassandraMailboxPathV3Table.USER).isEqualTo(literal(ALICE)),
column(CassandraMailboxPathV3Table.MAILBOX_NAME).isEqualTo(literal(MailboxConstants.INBOX)))
.build());
// trigger provisioning
given()
.auth().basic(aliceCredential.username().asString(), aliceCredential.password())
.body("""
{
"using": [
"urn:ietf:params:jmap:core",
"urn:ietf:params:jmap:mail"
],
"methodCalls": [
[
"Mailbox/get",
{
"accountId": "%s",
"ids": null
},
"c1"
]
]
}""".formatted(aliceCredential.accountId()))
.when()
.post("/jmap")
.then()
.statusCode(200);
// Received a new message
message2 = mailboxProbe.appendMessage(ALICE, aliceInboxPath, AppendCommand.from(generateMessageContent()));
testExtension.await();
}
private Message generateMessageContent() throws IOException {
return Message.Builder.of()
.setSubject("toto")
.setBody("content", StandardCharsets.UTF_8)
.build();
}
@Test
void ghostMailboxBugShouldChangeMailboxId() {
MailboxId newAliceInbox = mailboxProbe.getMailboxId(MailboxConstants.USER_NAMESPACE, ALICE, MailboxConstants.INBOX);
assertThat(aliceGhostInboxId).isNotEqualTo(newAliceInbox);
}
@Test
void ghostMailboxBugShouldDiscardOldContent() {
MailboxId newAliceInbox = mailboxProbe.getMailboxId(MailboxConstants.USER_NAMESPACE, ALICE, MailboxConstants.INBOX);
calmlyAwait
.timeout(THIRTY_SECONDS)
.untilAsserted(() ->
given()
.auth().basic(aliceCredential.username().asString(), aliceCredential.password())
.body("""
{
"using": [ "urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail", "urn:apache:james:params:jmap:mail:shares" ],
"methodCalls": [
[
"Email/query",
{
"accountId": "%s",
"filter": {
"inMailbox": "%s"
}
},
"c1"
]
]
}
""".formatted(aliceCredential.accountId(), newAliceInbox.serialize()))
.when()
.post("/jmap")
.then()
.statusCode(200)
.body("methodResponses[0][1].ids", hasSize(1))
.body("methodResponses[0][1].ids", not(contains(message1.getMessageId().serialize())))
.body("methodResponses[0][1].ids", contains(message2.getMessageId().serialize())));
}
/*
* 1 expectation failed.
* JSON path [0][1].messageIds doesn't match.
* Expected: a collection with size <2>
* Actual: [95b42310-47b7-11eb-90b1-f5389ace5056]
* https://builds.apache.org/blue/organizations/jenkins/james%2FApacheJames/detail/PR-268/44/tests
*/
@Test
@Tag(Unstable.TAG)
void webadminCanMergeTwoMailboxes() {
MailboxId newAliceInbox = mailboxProbe.getMailboxId(MailboxConstants.USER_NAMESPACE, ALICE, MailboxConstants.INBOX);
fixGhostMailboxes(newAliceInbox);
calmlyAwait
.timeout(THIRTY_SECONDS)
.untilAsserted(() -> given()
.auth().basic(aliceCredential.username().asString(), aliceCredential.password())
.body("""
{
"using": [ "urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail", "urn:apache:james:params:jmap:mail:shares" ],
"methodCalls": [
[
"Email/query",
{
"accountId": "%s",
"filter": {
"inMailbox": "%s"
}
},
"c1"
]
]
}
""".formatted(aliceCredential.accountId(), newAliceInbox.serialize()))
.when()
.post("/jmap")
.then()
.statusCode(200)
.body("methodResponses[0][1].ids", hasSize(2))
.body("methodResponses[0][1].ids", containsInAnyOrder(
message1.getMessageId().serialize(),
message2.getMessageId().serialize())));
}
@Test
void webadminCanMergeTwoMailboxesRights() throws Exception {
MailboxId newAliceInbox = mailboxProbe.getMailboxId(MailboxConstants.USER_NAMESPACE, ALICE, MailboxConstants.INBOX);
aclProbe.addRights(aliceInboxPath, CEDRIC, MailboxACL.FULL_RIGHTS);
fixGhostMailboxes(newAliceInbox);
given()
.auth().basic(aliceCredential.username().asString(), aliceCredential.password())
.header("accept", "application/json; jmapVersion=rfc-8621")
.body("""
{
"using": [ "urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail", "urn:apache:james:params:jmap:mail:shares" ],
"methodCalls": [
[
"Mailbox/get",
{
"accountId": "%s",
"ids": ["%s"]
},
"c1"
]
]
}""".formatted(aliceCredential.accountId(), newAliceInbox.serialize()))
.when()
.post("/jmap")
.then()
.statusCode(200)
.body("methodResponses[0][1].list[0].rights", hasKey(BOB))
.body("methodResponses[0][1].list[0].rights", hasKey(CEDRIC));
}
@Test
void oldGhostedMailboxShouldNoMoreBeAccessible() throws Exception {
MailboxId newAliceInbox = mailboxProbe.getMailboxId(MailboxConstants.USER_NAMESPACE, ALICE, MailboxConstants.INBOX);
aclProbe.addRights(aliceInboxPath, CEDRIC, MailboxACL.FULL_RIGHTS);
fixGhostMailboxes(newAliceInbox);
given()
.auth().basic(aliceCredential.username().asString(), aliceCredential.password())
.body("""
{
"using": [ "urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail", "urn:apache:james:params:jmap:mail:shares"],
"methodCalls": [
[
"Email/query",
{
"accountId": "%s",
"filter": {
"inMailbox": "%s"
}
},
"c1"
]
]
}
""".formatted(aliceCredential.accountId(), aliceGhostInboxId.serialize()))
.when()
.post("/jmap")
.then()
.statusCode(200)
.body("methodResponses[0][1].type", Matchers.is("invalidArguments"))
.body("methodResponses[0][1].description", Matchers.is(aliceGhostInboxId.serialize() + " can not be found"));
}
@Test
void mergingMailboxTaskShouldBeInformative() {
MailboxId newAliceInbox = mailboxProbe.getMailboxId(MailboxConstants.USER_NAMESPACE, ALICE, MailboxConstants.INBOX);
String taskId = fixGhostMailboxes(newAliceInbox);
with()
.spec(webadminSpecification)
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then()
.body("status", is(TaskManager.Status.COMPLETED.getValue()))
.body("taskId", is(taskId))
.body("additionalInformation.oldMailboxId", is(aliceGhostInboxId.serialize()))
.body("additionalInformation.newMailboxId", is(newAliceInbox.serialize()))
.body("additionalInformation.totalMessageCount", is(1))
.body("additionalInformation.messageMovedCount", is(1))
.body("additionalInformation.messageFailedCount", is(0))
.body("type", is(MailboxMergingTask.MAILBOX_MERGING.asString()))
.body("submitDate", is(not(nullValue())))
.body("startedDate", is(not(nullValue())))
.body("completedDate", is(not(nullValue())));
}
private String fixGhostMailboxes(MailboxId newAliceInbox) {
String taskId = given()
.spec(webadminSpecification)
.body("{" +
" \"mergeOrigin\":\"" + aliceGhostInboxId.serialize() + "\"," +
" \"mergeDestination\":\"" + newAliceInbox.serialize() + "\"" +
"}")
.post(CassandraMailboxMergingRoutes.BASE)
.jsonPath()
.getString("taskId");
given()
.spec(webadminSpecification)
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
testExtension.await();
return taskId;
}
}