JAMES-2586 Fix postgres mailbox acl concurrency issue (#2551)
Co-authored-by: hung phan <hphan@linagora.com>
diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresACLUpsertException.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresACLUpsertException.java
new file mode 100644
index 0000000..f87e809
--- /dev/null
+++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresACLUpsertException.java
@@ -0,0 +1,26 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail;
+
+public class PostgresACLUpsertException extends RuntimeException {
+ public PostgresACLUpsertException(String message) {
+ super(message);
+ }
+}
diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
index 0974c05..e68bb3f 100644
--- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
+++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
@@ -19,10 +19,12 @@
package org.apache.james.mailbox.postgres.mail;
+import java.time.Duration;
import java.util.function.Function;
import org.apache.james.core.Username;
import org.apache.james.mailbox.acl.ACLDiff;
+import org.apache.james.mailbox.exception.UnsupportedRightException;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.model.MailboxId;
@@ -32,10 +34,9 @@
import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
import org.apache.james.mailbox.store.mail.MailboxMapper;
-import com.github.fge.lambdas.Throwing;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
public class PostgresMailboxMapper implements MailboxMapper {
private final PostgresMailboxDAO postgresMailboxDAO;
@@ -96,21 +97,22 @@
@Override
public Mono<ACLDiff> updateACL(Mailbox mailbox, MailboxACL.ACLCommand mailboxACLCommand) {
- return upsertACL(mailbox,
- mailbox.getACL(),
- Throwing.supplier(() -> mailbox.getACL().apply(mailboxACLCommand)).get());
+ return postgresMailboxDAO.getACL(mailbox.getMailboxId())
+ .flatMap(pairMailboxACLAndVersion -> {
+ try {
+ MailboxACL newACL = pairMailboxACLAndVersion.getLeft().apply(mailboxACLCommand);
+ return postgresMailboxDAO.upsertACL(mailbox.getMailboxId(), newACL, pairMailboxACLAndVersion.getRight())
+ .thenReturn(ACLDiff.computeDiff(pairMailboxACLAndVersion.getLeft(), newACL));
+ } catch (UnsupportedRightException e) {
+ throw new RuntimeException(e);
+ }
+ }).retryWhen(Retry.backoff(3, Duration.ofMillis(100))
+ .filter(throwable -> throwable instanceof PostgresACLUpsertException));
}
@Override
public Mono<ACLDiff> setACL(Mailbox mailbox, MailboxACL mailboxACL) {
- return upsertACL(mailbox, mailbox.getACL(), mailboxACL);
- }
-
- private Mono<ACLDiff> upsertACL(Mailbox mailbox, MailboxACL oldACL, MailboxACL newACL) {
- return postgresMailboxDAO.upsertACL(mailbox.getMailboxId(), newACL)
- .then(Mono.fromCallable(() -> {
- mailbox.setACL(newACL);
- return ACLDiff.computeDiff(oldACL, newACL);
- }));
+ return postgresMailboxDAO.upsertACL(mailbox.getMailboxId(), mailboxACL)
+ .thenReturn(ACLDiff.computeDiff(mailbox.getACL(), mailboxACL));
}
}
diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
index 5b17924..0c30296 100644
--- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
+++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
@@ -47,6 +47,7 @@
Field<Long> MAILBOX_LAST_UID = DSL.field("mailbox_last_uid", BIGINT);
Field<Long> MAILBOX_HIGHEST_MODSEQ = DSL.field("mailbox_highest_modseq", BIGINT);
Field<Hstore> MAILBOX_ACL = DSL.field("mailbox_acl", org.jooq.impl.DefaultDataType.getDefaultDataType("hstore").asConvertedDataType(new HstoreBinding()));
+ Field<Long> MAILBOX_ACL_VERSION = DSL.field("mailbox_acl_version", BIGINT.notNull().defaultValue(DSL.field("0", BIGINT)));
Name MAILBOX_NAME_USER_NAME_NAMESPACE_UNIQUE_CONSTRAINT = DSL.name("mailbox_mailbox_name_user_name_mailbox_namespace_key");
@@ -60,6 +61,7 @@
.column(MAILBOX_LAST_UID)
.column(MAILBOX_HIGHEST_MODSEQ)
.column(MAILBOX_ACL)
+ .column(MAILBOX_ACL_VERSION)
.constraint(DSL.primaryKey(MAILBOX_ID))
.constraint(DSL.constraint(MAILBOX_NAME_USER_NAME_NAMESPACE_UNIQUE_CONSTRAINT).unique(MAILBOX_NAME, USER_NAME, MAILBOX_NAMESPACE))))
.supportsRowLevelSecurity()
diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
index 616e4bd..a8bd57b 100644
--- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
+++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
@@ -20,6 +20,7 @@
package org.apache.james.mailbox.postgres.mail.dao;
import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ACL;
+import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ACL_VERSION;
import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_HIGHEST_MODSEQ;
import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ID;
import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_LAST_UID;
@@ -55,6 +56,7 @@
import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.mailbox.model.search.Wildcard;
import org.apache.james.mailbox.postgres.PostgresMailboxId;
+import org.apache.james.mailbox.postgres.mail.PostgresACLUpsertException;
import org.apache.james.mailbox.postgres.mail.PostgresMailbox;
import org.apache.james.mailbox.store.MailboxExpressionBackwardCompatibility;
import org.jooq.Condition;
@@ -150,15 +152,34 @@
.switchIfEmpty(Mono.error(new MailboxNotFoundException(mailbox.getMailboxId())));
}
+ public Mono<Void> upsertACL(MailboxId mailboxId, MailboxACL acl, Long currentAclVersion) {
+ return postgresExecutor.executeReturnAffectedRowsCount(dslContext -> Mono.from(dslContext.update(TABLE_NAME)
+ .set(MAILBOX_ACL, MAILBOX_ACL_TO_HSTORE_FUNCTION.apply(acl))
+ .set(MAILBOX_ACL_VERSION, currentAclVersion + 1)
+ .where(MAILBOX_ID.eq(((PostgresMailboxId) mailboxId).asUuid()))
+ .and(MAILBOX_ACL_VERSION.eq(currentAclVersion))))
+ .filter(count -> count > 0)
+ .switchIfEmpty(Mono.error(new PostgresACLUpsertException("Upsert mailbox acl failed with mailboxId " + mailboxId.serialize())))
+ .then();
+ }
+
public Mono<Void> upsertACL(MailboxId mailboxId, MailboxACL acl) {
return postgresExecutor.executeReturnAffectedRowsCount(dslContext -> Mono.from(dslContext.update(TABLE_NAME)
- .set(MAILBOX_ACL, MAILBOX_ACL_TO_HSTORE_FUNCTION.apply(acl))
- .where(MAILBOX_ID.eq(((PostgresMailboxId) mailboxId).asUuid()))))
+ .set(MAILBOX_ACL, MAILBOX_ACL_TO_HSTORE_FUNCTION.apply(acl))
+ .set(DSL.field(MAILBOX_ACL_VERSION.getName()), (Object) DSL.field(MAILBOX_ACL_VERSION.getName() + " + 1"))
+ .where(MAILBOX_ID.eq(((PostgresMailboxId) mailboxId).asUuid()))))
.filter(count -> count > 0)
.switchIfEmpty(Mono.error(new RuntimeException("Upsert mailbox acl failed with mailboxId " + mailboxId.serialize())))
.then();
}
+ public Mono<Pair<MailboxACL, Long>> getACL(MailboxId mailboxId) {
+ return postgresExecutor.executeRow(dsl -> Mono.from(dsl.select(MAILBOX_ACL, MAILBOX_ACL_VERSION)
+ .from(TABLE_NAME)
+ .where(MAILBOX_ID.eq(((PostgresMailboxId) mailboxId).asUuid()))))
+ .map(record -> Pair.of(Optional.ofNullable(record.get(MAILBOX_ACL)).map(HSTORE_TO_MAILBOX_ACL_FUNCTION).orElse(new MailboxACL()), record.get(MAILBOX_ACL_VERSION)));
+ }
+
public Flux<PostgresMailbox> findMailboxesByUsername(Username userName) {
return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MAILBOX_ID,
MAILBOX_NAME,
diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
index 9f700ac..67f44e7 100644
--- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
+++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
@@ -19,18 +19,63 @@
package org.apache.james.mailbox.postgres.mail;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMapperACLTest;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import com.google.common.collect.ImmutableMap;
+
class PostgresMailboxMapperACLTest extends MailboxMapperACLTest {
@RegisterExtension
static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity(PostgresMailboxModule.MODULE);
+ private PostgresMailboxMapper mailboxMapper;
+
@Override
protected MailboxMapper createMailboxMapper() {
- return new PostgresMailboxMapper(new PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor()));
+ mailboxMapper = new PostgresMailboxMapper(new PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor()));
+ return mailboxMapper;
+ }
+
+ @Test
+ protected void updateAclShouldWorkWellInMultiThreadEnv() throws ExecutionException, InterruptedException {
+ MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Administer, MailboxACL.Right.Write);
+ MailboxACL.Rfc4314Rights newRights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Write);
+
+ ConcurrentTestRunner.builder()
+ .reactorOperation((threadNumber, step) -> {
+ int userNumber = threadNumber / 2;
+ MailboxACL.EntryKey key = MailboxACL.EntryKey.createUserEntryKey("user" + userNumber);
+ if (threadNumber % 2 == 0) {
+ return mailboxMapper.updateACL(benwaInboxMailbox, MailboxACL.command().key(key).rights(rights).asReplacement())
+ .then();
+ } else {
+ return mailboxMapper.updateACL(benwaInboxMailbox, MailboxACL.command().key(key).rights(newRights).asAddition())
+ .then();
+ }
+ })
+ .threadCount(10)
+ .operationCount(1)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ MailboxACL expectedMailboxACL = new MailboxACL(IntStream.range(0, 5).boxed()
+ .collect(ImmutableMap.toImmutableMap(userNumber -> MailboxACL.EntryKey.createUserEntryKey("user" + userNumber), userNumber -> rights)));
+
+ assertThat(
+ mailboxMapper.findMailboxById(benwaInboxMailbox.getMailboxId())
+ .block()
+ .getACL())
+ .isEqualTo(expectedMailboxACL);
}
}
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
index 88e8768..6e2f94a 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
@@ -43,7 +43,7 @@
private static final Username USER_1 = Username.of("user1");
private static final Username USER_2 = Username.of("user2");
- private Mailbox benwaInboxMailbox;
+ protected Mailbox benwaInboxMailbox;
private MailboxMapper mailboxMapper;