JAMES-33775 RSpamDListener (#1095)

diff --git a/third-party/rspamd/pom.xml b/third-party/rspamd/pom.xml
index 51142d3..3b9faea 100644
--- a/third-party/rspamd/pom.xml
+++ b/third-party/rspamd/pom.xml
@@ -120,6 +120,11 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>metrics-tests</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>testing-base</artifactId>
             <scope>test</scope>
         </dependency>
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RSpamDListener.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RSpamDListener.java
new file mode 100644
index 0000000..5a242f3
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RSpamDListener.java
@@ -0,0 +1,192 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd;
+
+
+import java.io.InputStream;
+
+import javax.inject.Inject;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.core.Username;
+import org.apache.james.events.Event;
+import org.apache.james.events.EventListener;
+import org.apache.james.events.Group;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.Role;
+import org.apache.james.mailbox.SystemMailboxesProvider;
+import org.apache.james.mailbox.events.MailboxEvents;
+import org.apache.james.mailbox.events.MessageMoveEvent;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.mailbox.store.event.SpamEventListener;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.util.FunctionalUtils;
+import org.apache.james.util.ReactorUtils;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.common.annotations.VisibleForTesting;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class RSpamDListener implements SpamEventListener, EventListener.ReactiveGroupEventListener {
+    public static class RSpamDListenerGroup extends Group {
+
+    }
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RSpamDListener.class);
+
+    private static final int LIMIT = 1;
+    private static final Group GROUP = new RSpamDListenerGroup();
+
+    private final RSpamDHttpClient rSpamDHttpClient;
+    private final MailboxManager mailboxManager;
+    private final MailboxSessionMapperFactory mapperFactory;
+    private final SystemMailboxesProvider systemMailboxesProvider;
+
+    @Inject
+    public RSpamDListener(RSpamDHttpClient rSpamDHttpClient, MailboxManager mailboxManager, MailboxSessionMapperFactory mapperFactory, SystemMailboxesProvider systemMailboxesProvider) {
+        this.rSpamDHttpClient = rSpamDHttpClient;
+        this.mailboxManager = mailboxManager;
+        this.mapperFactory = mapperFactory;
+        this.systemMailboxesProvider = systemMailboxesProvider;
+    }
+
+    @Override
+    public Group getDefaultGroup() {
+        return GROUP;
+    }
+
+    @Override
+    public boolean isHandling(Event event) {
+        return event instanceof MessageMoveEvent || event instanceof MailboxEvents.Added;
+    }
+
+    @Override
+    public Publisher<Void> reactiveEvent(Event event) {
+        if (event instanceof MessageMoveEvent) {
+            return handleMessageMoved((MessageMoveEvent) event);
+        } else if (event instanceof MailboxEvents.Added) {
+            return handleMessageAdded((MailboxEvents.Added) event);
+        }
+        return Mono.empty();
+    }
+
+    private Mono<Void> handleMessageAdded(MailboxEvents.Added addedEvent) {
+        return isAppendedToInbox(addedEvent)
+            .filter(FunctionalUtils.identityPredicate())
+            .doOnNext(isHam -> LOGGER.debug("Ham event detected, EventId = {}", addedEvent.getEventId().getId()))
+            .flatMap(any -> reportHamWhenAdded(addedEvent, mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))));
+    }
+
+    private Mono<Void> handleMessageMoved(MessageMoveEvent messageMoveEvent) {
+        return handleMessageMoved(mailboxMessagePublisher(messageMoveEvent), messageMoveEvent);
+    }
+
+    private Mono<Void> reportHamWhenAdded(MailboxEvents.Added addedEvent, MailboxSession session) {
+        return mapperFactory.getMailboxMapper(session)
+            .findMailboxById(addedEvent.getMailboxId())
+            .map(mailbox -> Pair.of(mailbox, mapperFactory.getMessageMapper(session)))
+            .flatMapMany(pair -> Flux.fromIterable(MessageRange.toRanges(addedEvent.getUids()))
+                .flatMap(range -> pair.getRight().findInMailboxReactive(pair.getLeft(), range, MessageMapper.FetchType.FULL, LIMIT)))
+            .map(Throwing.function(MailboxMessage::getFullContent))
+            .flatMap(rSpamDHttpClient::reportAsHam, ReactorUtils.DEFAULT_CONCURRENCY)
+            .then();
+    }
+
+    private Flux<InputStream> mailboxMessagePublisher(MessageMoveEvent messageMoveEvent) {
+        return Mono.fromCallable(() -> mapperFactory.getMessageIdMapper(mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))))
+            .flatMapMany(messageIdMapper -> messageIdMapper.findReactive(messageMoveEvent.getMessageIds(), MessageMapper.FetchType.FULL))
+            .map(Throwing.function(MailboxMessage::getFullContent));
+    }
+
+    private Mono<Void> handleMessageMoved(Flux<InputStream> mailboxMessagesPublisher, MessageMoveEvent messageMoveEvent) {
+        Mono<Boolean> reportHamIfNotSpamDetected = isMessageMovedOutOfSpamMailbox(messageMoveEvent)
+            .filter(FunctionalUtils.identityPredicate())
+            .doOnNext(isHam -> LOGGER.debug("Ham event detected, EventId = {}", messageMoveEvent.getEventId().getId()));
+
+        return isMessageMovedToSpamMailbox(messageMoveEvent)
+            .flatMap(isSpam -> {
+                if (isSpam) {
+                    LOGGER.debug("Spam event detected, EventId = {}", messageMoveEvent.getEventId().getId());
+                    return mailboxMessagesPublisher.flatMap(rSpamDHttpClient::reportAsSpam, ReactorUtils.DEFAULT_CONCURRENCY)
+                        .then();
+                } else {
+                    return reportHamIfNotSpamDetected
+                        .flatMapMany(isHam -> mailboxMessagesPublisher.flatMap(rSpamDHttpClient::reportAsHam, ReactorUtils.DEFAULT_CONCURRENCY))
+                        .then();
+                }
+            });
+    }
+
+    @VisibleForTesting
+    Mono<Boolean> isMessageMovedToSpamMailbox(MessageMoveEvent event) {
+        return isMessageMovedToMailbox(event, Role.SPAM);
+    }
+
+    @VisibleForTesting
+    Mono<Boolean> isMessageMovedOutOfSpamMailbox(MessageMoveEvent event) {
+        return isMessageMovedOutToMailbox(event, Role.SPAM)
+            .zipWith(isMessageMovedToMailbox(event, Role.TRASH))
+            .map(tuple -> tuple.getT1() && !tuple.getT2());
+    }
+
+    @VisibleForTesting
+    Mono<Boolean> isAppendedToInbox(MailboxEvents.Added addedEvent) {
+        return Flux.from(systemMailboxesProvider.getMailboxByRole(Role.INBOX, addedEvent.getUsername()))
+            .next()
+            .map(MessageManager::getId)
+            .map(mailboxId -> mailboxId.equals(addedEvent.getMailboxId()))
+            .onErrorResume(e -> {
+                LOGGER.warn("Could not resolve Inbox mailbox", e);
+                return Mono.just(false);
+            });
+    }
+
+    private Mono<Boolean> isMessageMovedToMailbox(MessageMoveEvent event, Role role) {
+        return Flux.from(systemMailboxesProvider.getMailboxByRole(role, event.getUsername()))
+            .next()
+            .map(MessageManager::getId)
+            .map(spamMailboxId -> event.getMessageMoves().addedMailboxIds().contains(spamMailboxId))
+            .onErrorResume(e -> {
+                LOGGER.warn("Could not resolve {} mailbox", role, e);
+                return Mono.just(false);
+            });
+    }
+
+    private Mono<Boolean> isMessageMovedOutToMailbox(MessageMoveEvent event, Role role) {
+        return Flux.from(systemMailboxesProvider.getMailboxByRole(role, event.getUsername()))
+            .next()
+            .map(MessageManager::getId)
+            .map(spamMailboxId -> event.getMessageMoves().removedMailboxIds().contains(spamMailboxId))
+            .onErrorResume(e -> {
+                LOGGER.warn("Could not resolve {} mailbox", role, e);
+                return Mono.just(false);
+            });
+    }
+}
\ No newline at end of file
diff --git a/third-party/rspamd/src/test/java/org/apache/james/rspamd/RSpamDListenerTest.java b/third-party/rspamd/src/test/java/org/apache/james/rspamd/RSpamDListenerTest.java
new file mode 100644
index 0000000..dd0db4b
--- /dev/null
+++ b/third-party/rspamd/src/test/java/org/apache/james/rspamd/RSpamDListenerTest.java
@@ -0,0 +1,295 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rspamd;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+
+import javax.mail.Flags;
+
+import org.apache.james.core.Username;
+import org.apache.james.events.Group;
+import org.apache.james.mailbox.DefaultMailboxes;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MailboxSessionUtil;
+import org.apache.james.mailbox.events.MailboxEvents;
+import org.apache.james.mailbox.events.MessageMoveEvent;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
+import org.apache.james.mailbox.model.ByteContent;
+import org.apache.james.mailbox.model.Mailbox;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageMetaData;
+import org.apache.james.mailbox.model.MessageMoves;
+import org.apache.james.mailbox.model.TestMessageId;
+import org.apache.james.mailbox.model.ThreadId;
+import org.apache.james.mailbox.model.UidValidity;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.mailbox.store.StoreMailboxManager;
+import org.apache.james.mailbox.store.SystemMailboxesProviderImpl;
+import org.apache.james.mailbox.store.event.EventFactory;
+import org.apache.james.mailbox.store.mail.MailboxMapper;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Mono;
+
+public class RSpamDListenerTest {
+    static final Username USER = Username.of("user");
+    static final MailboxSession MAILBOX_SESSION = MailboxSessionUtil.create(USER);
+    static final UidValidity UID_VALIDITY = UidValidity.of(43);
+    static final TestMessageId MESSAGE_ID = TestMessageId.of(45);
+    static final ThreadId THREAD_ID = ThreadId.fromBaseMessageId(MESSAGE_ID);
+
+    private RSpamDHttpClient rSpamDHttpClient;
+    private RSpamDListener listener;
+    private MailboxSessionMapperFactory mapperFactory;
+
+    private Mailbox inbox;
+    private Mailbox mailbox1;
+    private Mailbox mailbox2;
+    private MailboxId mailboxId1;
+    private MailboxId mailboxId2;
+    private MailboxId spamMailboxId;
+    private MailboxId spamCapitalMailboxId;
+    private MailboxId trashMailboxId;
+
+    @BeforeEach
+    void setup() {
+        rSpamDHttpClient = mock(RSpamDHttpClient.class);
+
+        when(rSpamDHttpClient.reportAsHam(any())).thenReturn(Mono.empty());
+        when(rSpamDHttpClient.reportAsSpam(any())).thenReturn(Mono.empty());
+
+        StoreMailboxManager mailboxManager = spy(InMemoryIntegrationResources.defaultResources().getMailboxManager());
+        SystemMailboxesProviderImpl systemMailboxesProvider = new SystemMailboxesProviderImpl(mailboxManager);
+        when(mailboxManager.createSystemSession(USER))
+            .thenReturn(MAILBOX_SESSION);
+        mapperFactory = mailboxManager.getMapperFactory();
+        MailboxMapper mailboxMapper = mapperFactory.createMailboxMapper(MAILBOX_SESSION);
+        inbox = mailboxMapper.create(MailboxPath.forUser(USER, DefaultMailboxes.INBOX), UID_VALIDITY).block();
+        mailbox1 = mailboxMapper.create(MailboxPath.forUser(USER, "mailbox1"), UID_VALIDITY).block();
+        mailbox2 = mailboxMapper.create(MailboxPath.forUser(USER, "mailbox2"), UID_VALIDITY).block();
+        mailboxId1 = mailbox1.getMailboxId();
+        mailboxId2 = mailbox2.getMailboxId();
+        spamMailboxId = mailboxMapper.create(MailboxPath.forUser(USER, "Spam"), UID_VALIDITY).block().getMailboxId();
+        spamCapitalMailboxId = mailboxMapper.create(MailboxPath.forUser(USER, "SPAM"), UID_VALIDITY).block().getMailboxId();
+        trashMailboxId = mailboxMapper.create(MailboxPath.forUser(USER, "Trash"), UID_VALIDITY).block().getMailboxId();
+
+        listener = new RSpamDListener(rSpamDHttpClient, mailboxManager, mapperFactory, systemMailboxesProvider);
+    }
+
+    @Test
+    void deserializeListenerGroup() throws Exception {
+        assertThat(Group.deserialize("org.apache.james.rspamd.RSpamDListener$RSpamDListenerGroup"))
+            .isEqualTo(new RSpamDListener.RSpamDListenerGroup());
+    }
+
+    @Test
+    void isEventOnSpamMailboxShouldReturnFalseWhenMessageIsMovedToANonSpamMailbox() {
+        MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+            .session(MAILBOX_SESSION)
+            .messageMoves(MessageMoves.builder()
+                .previousMailboxIds(mailboxId1)
+                .targetMailboxIds(mailboxId2)
+                .build())
+            .messageId(MESSAGE_ID)
+            .build();
+
+        assertThat(listener.isMessageMovedToSpamMailbox(messageMoveEvent).block()).isFalse();
+    }
+
+    @Test
+    void isEventOnSpamMailboxShouldReturnTrueWhenMailboxIsSpam() {
+        MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+            .session(MAILBOX_SESSION)
+            .messageMoves(MessageMoves.builder()
+                .previousMailboxIds(mailboxId1)
+                .targetMailboxIds(spamMailboxId)
+                .build())
+            .messageId(MESSAGE_ID)
+            .build();
+
+        assertThat(listener.isMessageMovedToSpamMailbox(messageMoveEvent).block()).isTrue();
+    }
+
+    @Test
+    void isEventOnSpamMailboxShouldReturnFalseWhenMailboxIsSpamOtherCase() {
+        MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+            .session(MAILBOX_SESSION)
+            .messageMoves(MessageMoves.builder()
+                .previousMailboxIds(mailboxId1)
+                .targetMailboxIds(spamCapitalMailboxId)
+                .build())
+            .messageId(MESSAGE_ID)
+            .build();
+
+        assertThat(listener.isMessageMovedToSpamMailbox(messageMoveEvent).block()).isFalse();
+    }
+
+    @Test
+    void isMessageMovedOutOfSpamMailboxShouldReturnFalseWhenMessageMovedBetweenNonSpamMailboxes() {
+        MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+            .session(MAILBOX_SESSION)
+            .messageMoves(MessageMoves.builder()
+                .previousMailboxIds(mailboxId1)
+                .targetMailboxIds(mailboxId2)
+                .build())
+            .messageId(MESSAGE_ID)
+            .build();
+
+        assertThat(listener.isMessageMovedOutOfSpamMailbox(messageMoveEvent).block()).isFalse();
+    }
+
+    @Test
+    void isMessageMovedOutOfSpamMailboxShouldReturnFalseWhenMessageMovedOutOfCapitalSpamMailbox() {
+        MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+            .session(MAILBOX_SESSION)
+            .messageMoves(MessageMoves.builder()
+                .previousMailboxIds(spamCapitalMailboxId)
+                .targetMailboxIds(mailboxId2)
+                .build())
+            .messageId(MESSAGE_ID)
+            .build();
+
+        assertThat(listener.isMessageMovedOutOfSpamMailbox(messageMoveEvent).block()).isFalse();
+    }
+
+    @Test
+    void isMessageMovedOutOfSpamMailboxShouldReturnTrueWhenMessageMovedOutOfSpamMailbox() {
+        MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+            .session(MAILBOX_SESSION)
+            .messageMoves(MessageMoves.builder()
+                .previousMailboxIds(spamMailboxId)
+                .targetMailboxIds(mailboxId2)
+                .build())
+            .messageId(MESSAGE_ID)
+            .build();
+
+        assertThat(listener.isMessageMovedOutOfSpamMailbox(messageMoveEvent).block()).isTrue();
+    }
+
+    @Test
+    void isMessageMovedOutOfSpamMailboxShouldReturnFalseWhenMessageMovedToTrash() {
+        MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+            .session(MAILBOX_SESSION)
+            .messageMoves(MessageMoves.builder()
+                .previousMailboxIds(spamMailboxId)
+                .targetMailboxIds(trashMailboxId)
+                .build())
+            .messageId(MESSAGE_ID)
+            .build();
+
+        assertThat(listener.isMessageMovedOutOfSpamMailbox(messageMoveEvent).block()).isFalse();
+    }
+
+
+    @Test
+    void eventShouldCallReportSpamLearningWhenTheMovedEventMatches() throws Exception {
+        createMessage(inbox);
+        MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+            .session(MAILBOX_SESSION)
+            .messageMoves(MessageMoves.builder()
+                .previousMailboxIds(mailboxId1)
+                .targetMailboxIds(spamMailboxId)
+                .build())
+            .messageId(MESSAGE_ID)
+            .build();
+
+        listener.event(messageMoveEvent);
+
+        verify(rSpamDHttpClient).reportAsSpam(any());
+    }
+
+    @Test
+    void eventShouldCallHamLearningWhenTheMovedEventMatches() throws Exception {
+        createMessage(inbox);
+
+        MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+            .session(MAILBOX_SESSION)
+            .messageMoves(MessageMoves.builder()
+                .previousMailboxIds(spamMailboxId)
+                .targetMailboxIds(mailboxId1)
+                .build())
+            .messageId(MESSAGE_ID)
+            .build();
+
+        listener.event(messageMoveEvent);
+
+        verify(rSpamDHttpClient).reportAsHam(any());
+    }
+
+    @Test
+    void eventShouldCallReportHamLearningWhenTheMessageIsAddedInInbox() throws Exception {
+        SimpleMailboxMessage message = createMessage(inbox);
+
+        MailboxEvents.Added addedEvent = EventFactory.added()
+            .randomEventId()
+            .mailboxSession(MAILBOX_SESSION)
+            .mailbox(inbox)
+            .addMetaData(message.metaData())
+            .build();
+
+        listener.event(addedEvent);
+
+        verify(rSpamDHttpClient).reportAsHam(any());
+    }
+
+    @Test
+    void eventShouldNotCallReportHamLearningWhenTheMessageIsAddedInAMailboxOtherThanInbox() throws Exception {
+        SimpleMailboxMessage message = createMessage(mailbox1);
+
+        MailboxEvents.Added addedEvent = EventFactory.added()
+            .randomEventId()
+            .mailboxSession(MAILBOX_SESSION)
+            .mailbox(mailbox1)
+            .addMetaData(message.metaData())
+            .build();
+
+        listener.event(addedEvent);
+
+        verify(rSpamDHttpClient, never()).reportAsHam(any());
+    }
+
+    private SimpleMailboxMessage createMessage(Mailbox mailbox) throws MailboxException {
+        int size = 45;
+        int bodyStartOctet = 25;
+        byte[] content = "Subject: test\r\n\r\nBody\r\n".getBytes(StandardCharsets.UTF_8);
+        SimpleMailboxMessage message = new SimpleMailboxMessage(MESSAGE_ID, THREAD_ID, new Date(),
+            size, bodyStartOctet, new ByteContent(content), new Flags(), new PropertyBuilder().build(),
+            mailbox.getMailboxId());
+        MessageMetaData messageMetaData = mapperFactory.createMessageMapper(null).add(mailbox, message);
+        message.setUid(messageMetaData.getUid());
+        return message;
+    }
+}