JAMES-2586 Fix postgres mailbox acl concurrency issue (#2551)

Co-authored-by: hung phan <hphan@linagora.com>
diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresACLUpsertException.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresACLUpsertException.java
new file mode 100644
index 0000000..f87e809
--- /dev/null
+++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresACLUpsertException.java
@@ -0,0 +1,26 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail;
+
+public class PostgresACLUpsertException extends RuntimeException {
+    public PostgresACLUpsertException(String message) {
+        super(message);
+    }
+}
diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
index 0974c05..e68bb3f 100644
--- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
+++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
@@ -19,10 +19,12 @@
 
 package org.apache.james.mailbox.postgres.mail;
 
+import java.time.Duration;
 import java.util.function.Function;
 
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.acl.ACLDiff;
+import org.apache.james.mailbox.exception.UnsupportedRightException;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.model.MailboxId;
@@ -32,10 +34,9 @@
 import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
 
-import com.github.fge.lambdas.Throwing;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 public class PostgresMailboxMapper implements MailboxMapper {
     private final PostgresMailboxDAO postgresMailboxDAO;
@@ -96,21 +97,22 @@
 
     @Override
     public Mono<ACLDiff> updateACL(Mailbox mailbox, MailboxACL.ACLCommand mailboxACLCommand) {
-        return upsertACL(mailbox,
-            mailbox.getACL(),
-            Throwing.supplier(() -> mailbox.getACL().apply(mailboxACLCommand)).get());
+        return postgresMailboxDAO.getACL(mailbox.getMailboxId())
+            .flatMap(pairMailboxACLAndVersion -> {
+                try {
+                    MailboxACL newACL = pairMailboxACLAndVersion.getLeft().apply(mailboxACLCommand);
+                    return postgresMailboxDAO.upsertACL(mailbox.getMailboxId(), newACL, pairMailboxACLAndVersion.getRight())
+                        .thenReturn(ACLDiff.computeDiff(pairMailboxACLAndVersion.getLeft(), newACL));
+                } catch (UnsupportedRightException e) {
+                    throw new RuntimeException(e);
+                }
+            }).retryWhen(Retry.backoff(3, Duration.ofMillis(100))
+                .filter(throwable -> throwable instanceof PostgresACLUpsertException));
     }
 
     @Override
     public Mono<ACLDiff> setACL(Mailbox mailbox, MailboxACL mailboxACL) {
-        return upsertACL(mailbox, mailbox.getACL(), mailboxACL);
-    }
-
-    private Mono<ACLDiff> upsertACL(Mailbox mailbox, MailboxACL oldACL, MailboxACL newACL) {
-        return postgresMailboxDAO.upsertACL(mailbox.getMailboxId(), newACL)
-            .then(Mono.fromCallable(() -> {
-                mailbox.setACL(newACL);
-                return ACLDiff.computeDiff(oldACL, newACL);
-            }));
+        return postgresMailboxDAO.upsertACL(mailbox.getMailboxId(), mailboxACL)
+            .thenReturn(ACLDiff.computeDiff(mailbox.getACL(), mailboxACL));
     }
 }
diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
index 5b17924..0c30296 100644
--- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
+++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
@@ -47,6 +47,7 @@
         Field<Long> MAILBOX_LAST_UID = DSL.field("mailbox_last_uid", BIGINT);
         Field<Long> MAILBOX_HIGHEST_MODSEQ = DSL.field("mailbox_highest_modseq", BIGINT);
         Field<Hstore> MAILBOX_ACL = DSL.field("mailbox_acl", org.jooq.impl.DefaultDataType.getDefaultDataType("hstore").asConvertedDataType(new HstoreBinding()));
+        Field<Long> MAILBOX_ACL_VERSION = DSL.field("mailbox_acl_version", BIGINT.notNull().defaultValue(DSL.field("0", BIGINT)));
 
         Name MAILBOX_NAME_USER_NAME_NAMESPACE_UNIQUE_CONSTRAINT = DSL.name("mailbox_mailbox_name_user_name_mailbox_namespace_key");
 
@@ -60,6 +61,7 @@
                 .column(MAILBOX_LAST_UID)
                 .column(MAILBOX_HIGHEST_MODSEQ)
                 .column(MAILBOX_ACL)
+                .column(MAILBOX_ACL_VERSION)
                 .constraint(DSL.primaryKey(MAILBOX_ID))
                 .constraint(DSL.constraint(MAILBOX_NAME_USER_NAME_NAMESPACE_UNIQUE_CONSTRAINT).unique(MAILBOX_NAME, USER_NAME, MAILBOX_NAMESPACE))))
             .supportsRowLevelSecurity()
diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
index 616e4bd..a8bd57b 100644
--- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
+++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
@@ -20,6 +20,7 @@
 package org.apache.james.mailbox.postgres.mail.dao;
 
 import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ACL;
+import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ACL_VERSION;
 import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_HIGHEST_MODSEQ;
 import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ID;
 import static org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_LAST_UID;
@@ -55,6 +56,7 @@
 import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.mailbox.model.search.Wildcard;
 import org.apache.james.mailbox.postgres.PostgresMailboxId;
+import org.apache.james.mailbox.postgres.mail.PostgresACLUpsertException;
 import org.apache.james.mailbox.postgres.mail.PostgresMailbox;
 import org.apache.james.mailbox.store.MailboxExpressionBackwardCompatibility;
 import org.jooq.Condition;
@@ -150,15 +152,34 @@
             .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailbox.getMailboxId())));
     }
 
+    public Mono<Void> upsertACL(MailboxId mailboxId, MailboxACL acl, Long currentAclVersion) {
+        return postgresExecutor.executeReturnAffectedRowsCount(dslContext -> Mono.from(dslContext.update(TABLE_NAME)
+                .set(MAILBOX_ACL, MAILBOX_ACL_TO_HSTORE_FUNCTION.apply(acl))
+                .set(MAILBOX_ACL_VERSION, currentAclVersion + 1)
+                .where(MAILBOX_ID.eq(((PostgresMailboxId) mailboxId).asUuid()))
+                .and(MAILBOX_ACL_VERSION.eq(currentAclVersion))))
+            .filter(count -> count > 0)
+            .switchIfEmpty(Mono.error(new PostgresACLUpsertException("Upsert mailbox acl failed with mailboxId " + mailboxId.serialize())))
+            .then();
+    }
+
     public Mono<Void> upsertACL(MailboxId mailboxId, MailboxACL acl) {
         return postgresExecutor.executeReturnAffectedRowsCount(dslContext -> Mono.from(dslContext.update(TABLE_NAME)
-            .set(MAILBOX_ACL, MAILBOX_ACL_TO_HSTORE_FUNCTION.apply(acl))
-            .where(MAILBOX_ID.eq(((PostgresMailboxId) mailboxId).asUuid()))))
+                .set(MAILBOX_ACL, MAILBOX_ACL_TO_HSTORE_FUNCTION.apply(acl))
+                .set(DSL.field(MAILBOX_ACL_VERSION.getName()), (Object) DSL.field(MAILBOX_ACL_VERSION.getName() + " + 1"))
+                .where(MAILBOX_ID.eq(((PostgresMailboxId) mailboxId).asUuid()))))
             .filter(count -> count > 0)
             .switchIfEmpty(Mono.error(new RuntimeException("Upsert mailbox acl failed with mailboxId " + mailboxId.serialize())))
             .then();
     }
 
+    public Mono<Pair<MailboxACL, Long>> getACL(MailboxId mailboxId) {
+        return postgresExecutor.executeRow(dsl -> Mono.from(dsl.select(MAILBOX_ACL, MAILBOX_ACL_VERSION)
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq(((PostgresMailboxId) mailboxId).asUuid()))))
+            .map(record -> Pair.of(Optional.ofNullable(record.get(MAILBOX_ACL)).map(HSTORE_TO_MAILBOX_ACL_FUNCTION).orElse(new MailboxACL()), record.get(MAILBOX_ACL_VERSION)));
+    }
+
     public Flux<PostgresMailbox> findMailboxesByUsername(Username userName) {
         return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MAILBOX_ID,
                     MAILBOX_NAME,
diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
index 9f700ac..67f44e7 100644
--- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
+++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
@@ -19,18 +19,63 @@
 
 package org.apache.james.mailbox.postgres.mail;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
 import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.mail.model.MailboxMapperACLTest;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import com.google.common.collect.ImmutableMap;
+
 class PostgresMailboxMapperACLTest extends MailboxMapperACLTest {
     @RegisterExtension
     static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity(PostgresMailboxModule.MODULE);
 
+    private PostgresMailboxMapper mailboxMapper;
+
     @Override
     protected MailboxMapper createMailboxMapper() {
-        return new PostgresMailboxMapper(new PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor()));
+        mailboxMapper = new PostgresMailboxMapper(new PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor()));
+        return mailboxMapper;
+    }
+
+    @Test
+    protected void updateAclShouldWorkWellInMultiThreadEnv() throws ExecutionException, InterruptedException {
+        MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Administer, MailboxACL.Right.Write);
+        MailboxACL.Rfc4314Rights newRights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Write);
+
+        ConcurrentTestRunner.builder()
+            .reactorOperation((threadNumber, step) -> {
+                int userNumber = threadNumber / 2;
+                MailboxACL.EntryKey key = MailboxACL.EntryKey.createUserEntryKey("user" + userNumber);
+                if (threadNumber % 2 == 0) {
+                    return mailboxMapper.updateACL(benwaInboxMailbox, MailboxACL.command().key(key).rights(rights).asReplacement())
+                        .then();
+                } else {
+                    return mailboxMapper.updateACL(benwaInboxMailbox, MailboxACL.command().key(key).rights(newRights).asAddition())
+                        .then();
+                }
+            })
+            .threadCount(10)
+            .operationCount(1)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        MailboxACL expectedMailboxACL = new MailboxACL(IntStream.range(0, 5).boxed()
+            .collect(ImmutableMap.toImmutableMap(userNumber -> MailboxACL.EntryKey.createUserEntryKey("user" + userNumber), userNumber -> rights)));
+
+        assertThat(
+            mailboxMapper.findMailboxById(benwaInboxMailbox.getMailboxId())
+                .block()
+                .getACL())
+            .isEqualTo(expectedMailboxACL);
     }
 }
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
index 88e8768..6e2f94a 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
@@ -43,7 +43,7 @@
     private static final Username USER_1 = Username.of("user1");
     private static final Username USER_2 = Username.of("user2");
 
-    private Mailbox benwaInboxMailbox;
+    protected Mailbox benwaInboxMailbox;
 
     private MailboxMapper mailboxMapper;