JAMES-33775 RSpamDListener (#1095)
diff --git a/third-party/rspamd/pom.xml b/third-party/rspamd/pom.xml
index 51142d3..3b9faea 100644
--- a/third-party/rspamd/pom.xml
+++ b/third-party/rspamd/pom.xml
@@ -120,6 +120,11 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>metrics-tests</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>testing-base</artifactId>
<scope>test</scope>
</dependency>
diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RSpamDListener.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RSpamDListener.java
new file mode 100644
index 0000000..5a242f3
--- /dev/null
+++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RSpamDListener.java
@@ -0,0 +1,192 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rspamd;
+
+
+import java.io.InputStream;
+
+import javax.inject.Inject;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.core.Username;
+import org.apache.james.events.Event;
+import org.apache.james.events.EventListener;
+import org.apache.james.events.Group;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.Role;
+import org.apache.james.mailbox.SystemMailboxesProvider;
+import org.apache.james.mailbox.events.MailboxEvents;
+import org.apache.james.mailbox.events.MessageMoveEvent;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.mailbox.store.event.SpamEventListener;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.apache.james.util.FunctionalUtils;
+import org.apache.james.util.ReactorUtils;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.common.annotations.VisibleForTesting;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class RSpamDListener implements SpamEventListener, EventListener.ReactiveGroupEventListener {
+ public static class RSpamDListenerGroup extends Group {
+
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RSpamDListener.class);
+
+ private static final int LIMIT = 1;
+ private static final Group GROUP = new RSpamDListenerGroup();
+
+ private final RSpamDHttpClient rSpamDHttpClient;
+ private final MailboxManager mailboxManager;
+ private final MailboxSessionMapperFactory mapperFactory;
+ private final SystemMailboxesProvider systemMailboxesProvider;
+
+ @Inject
+ public RSpamDListener(RSpamDHttpClient rSpamDHttpClient, MailboxManager mailboxManager, MailboxSessionMapperFactory mapperFactory, SystemMailboxesProvider systemMailboxesProvider) {
+ this.rSpamDHttpClient = rSpamDHttpClient;
+ this.mailboxManager = mailboxManager;
+ this.mapperFactory = mapperFactory;
+ this.systemMailboxesProvider = systemMailboxesProvider;
+ }
+
+ @Override
+ public Group getDefaultGroup() {
+ return GROUP;
+ }
+
+ @Override
+ public boolean isHandling(Event event) {
+ return event instanceof MessageMoveEvent || event instanceof MailboxEvents.Added;
+ }
+
+ @Override
+ public Publisher<Void> reactiveEvent(Event event) {
+ if (event instanceof MessageMoveEvent) {
+ return handleMessageMoved((MessageMoveEvent) event);
+ } else if (event instanceof MailboxEvents.Added) {
+ return handleMessageAdded((MailboxEvents.Added) event);
+ }
+ return Mono.empty();
+ }
+
+ private Mono<Void> handleMessageAdded(MailboxEvents.Added addedEvent) {
+ return isAppendedToInbox(addedEvent)
+ .filter(FunctionalUtils.identityPredicate())
+ .doOnNext(isHam -> LOGGER.debug("Ham event detected, EventId = {}", addedEvent.getEventId().getId()))
+ .flatMap(any -> reportHamWhenAdded(addedEvent, mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))));
+ }
+
+ private Mono<Void> handleMessageMoved(MessageMoveEvent messageMoveEvent) {
+ return handleMessageMoved(mailboxMessagePublisher(messageMoveEvent), messageMoveEvent);
+ }
+
+ private Mono<Void> reportHamWhenAdded(MailboxEvents.Added addedEvent, MailboxSession session) {
+ return mapperFactory.getMailboxMapper(session)
+ .findMailboxById(addedEvent.getMailboxId())
+ .map(mailbox -> Pair.of(mailbox, mapperFactory.getMessageMapper(session)))
+ .flatMapMany(pair -> Flux.fromIterable(MessageRange.toRanges(addedEvent.getUids()))
+ .flatMap(range -> pair.getRight().findInMailboxReactive(pair.getLeft(), range, MessageMapper.FetchType.FULL, LIMIT)))
+ .map(Throwing.function(MailboxMessage::getFullContent))
+ .flatMap(rSpamDHttpClient::reportAsHam, ReactorUtils.DEFAULT_CONCURRENCY)
+ .then();
+ }
+
+ private Flux<InputStream> mailboxMessagePublisher(MessageMoveEvent messageMoveEvent) {
+ return Mono.fromCallable(() -> mapperFactory.getMessageIdMapper(mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))))
+ .flatMapMany(messageIdMapper -> messageIdMapper.findReactive(messageMoveEvent.getMessageIds(), MessageMapper.FetchType.FULL))
+ .map(Throwing.function(MailboxMessage::getFullContent));
+ }
+
+ private Mono<Void> handleMessageMoved(Flux<InputStream> mailboxMessagesPublisher, MessageMoveEvent messageMoveEvent) {
+ Mono<Boolean> reportHamIfNotSpamDetected = isMessageMovedOutOfSpamMailbox(messageMoveEvent)
+ .filter(FunctionalUtils.identityPredicate())
+ .doOnNext(isHam -> LOGGER.debug("Ham event detected, EventId = {}", messageMoveEvent.getEventId().getId()));
+
+ return isMessageMovedToSpamMailbox(messageMoveEvent)
+ .flatMap(isSpam -> {
+ if (isSpam) {
+ LOGGER.debug("Spam event detected, EventId = {}", messageMoveEvent.getEventId().getId());
+ return mailboxMessagesPublisher.flatMap(rSpamDHttpClient::reportAsSpam, ReactorUtils.DEFAULT_CONCURRENCY)
+ .then();
+ } else {
+ return reportHamIfNotSpamDetected
+ .flatMapMany(isHam -> mailboxMessagesPublisher.flatMap(rSpamDHttpClient::reportAsHam, ReactorUtils.DEFAULT_CONCURRENCY))
+ .then();
+ }
+ });
+ }
+
+ @VisibleForTesting
+ Mono<Boolean> isMessageMovedToSpamMailbox(MessageMoveEvent event) {
+ return isMessageMovedToMailbox(event, Role.SPAM);
+ }
+
+ @VisibleForTesting
+ Mono<Boolean> isMessageMovedOutOfSpamMailbox(MessageMoveEvent event) {
+ return isMessageMovedOutToMailbox(event, Role.SPAM)
+ .zipWith(isMessageMovedToMailbox(event, Role.TRASH))
+ .map(tuple -> tuple.getT1() && !tuple.getT2());
+ }
+
+ @VisibleForTesting
+ Mono<Boolean> isAppendedToInbox(MailboxEvents.Added addedEvent) {
+ return Flux.from(systemMailboxesProvider.getMailboxByRole(Role.INBOX, addedEvent.getUsername()))
+ .next()
+ .map(MessageManager::getId)
+ .map(mailboxId -> mailboxId.equals(addedEvent.getMailboxId()))
+ .onErrorResume(e -> {
+ LOGGER.warn("Could not resolve Inbox mailbox", e);
+ return Mono.just(false);
+ });
+ }
+
+ private Mono<Boolean> isMessageMovedToMailbox(MessageMoveEvent event, Role role) {
+ return Flux.from(systemMailboxesProvider.getMailboxByRole(role, event.getUsername()))
+ .next()
+ .map(MessageManager::getId)
+ .map(spamMailboxId -> event.getMessageMoves().addedMailboxIds().contains(spamMailboxId))
+ .onErrorResume(e -> {
+ LOGGER.warn("Could not resolve {} mailbox", role, e);
+ return Mono.just(false);
+ });
+ }
+
+ private Mono<Boolean> isMessageMovedOutToMailbox(MessageMoveEvent event, Role role) {
+ return Flux.from(systemMailboxesProvider.getMailboxByRole(role, event.getUsername()))
+ .next()
+ .map(MessageManager::getId)
+ .map(spamMailboxId -> event.getMessageMoves().removedMailboxIds().contains(spamMailboxId))
+ .onErrorResume(e -> {
+ LOGGER.warn("Could not resolve {} mailbox", role, e);
+ return Mono.just(false);
+ });
+ }
+}
\ No newline at end of file
diff --git a/third-party/rspamd/src/test/java/org/apache/james/rspamd/RSpamDListenerTest.java b/third-party/rspamd/src/test/java/org/apache/james/rspamd/RSpamDListenerTest.java
new file mode 100644
index 0000000..dd0db4b
--- /dev/null
+++ b/third-party/rspamd/src/test/java/org/apache/james/rspamd/RSpamDListenerTest.java
@@ -0,0 +1,295 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.rspamd;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+
+import javax.mail.Flags;
+
+import org.apache.james.core.Username;
+import org.apache.james.events.Group;
+import org.apache.james.mailbox.DefaultMailboxes;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MailboxSessionUtil;
+import org.apache.james.mailbox.events.MailboxEvents;
+import org.apache.james.mailbox.events.MessageMoveEvent;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
+import org.apache.james.mailbox.model.ByteContent;
+import org.apache.james.mailbox.model.Mailbox;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageMetaData;
+import org.apache.james.mailbox.model.MessageMoves;
+import org.apache.james.mailbox.model.TestMessageId;
+import org.apache.james.mailbox.model.ThreadId;
+import org.apache.james.mailbox.model.UidValidity;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.mailbox.store.StoreMailboxManager;
+import org.apache.james.mailbox.store.SystemMailboxesProviderImpl;
+import org.apache.james.mailbox.store.event.EventFactory;
+import org.apache.james.mailbox.store.mail.MailboxMapper;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.rspamd.client.RSpamDHttpClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Mono;
+
+public class RSpamDListenerTest {
+ static final Username USER = Username.of("user");
+ static final MailboxSession MAILBOX_SESSION = MailboxSessionUtil.create(USER);
+ static final UidValidity UID_VALIDITY = UidValidity.of(43);
+ static final TestMessageId MESSAGE_ID = TestMessageId.of(45);
+ static final ThreadId THREAD_ID = ThreadId.fromBaseMessageId(MESSAGE_ID);
+
+ private RSpamDHttpClient rSpamDHttpClient;
+ private RSpamDListener listener;
+ private MailboxSessionMapperFactory mapperFactory;
+
+ private Mailbox inbox;
+ private Mailbox mailbox1;
+ private Mailbox mailbox2;
+ private MailboxId mailboxId1;
+ private MailboxId mailboxId2;
+ private MailboxId spamMailboxId;
+ private MailboxId spamCapitalMailboxId;
+ private MailboxId trashMailboxId;
+
+ @BeforeEach
+ void setup() {
+ rSpamDHttpClient = mock(RSpamDHttpClient.class);
+
+ when(rSpamDHttpClient.reportAsHam(any())).thenReturn(Mono.empty());
+ when(rSpamDHttpClient.reportAsSpam(any())).thenReturn(Mono.empty());
+
+ StoreMailboxManager mailboxManager = spy(InMemoryIntegrationResources.defaultResources().getMailboxManager());
+ SystemMailboxesProviderImpl systemMailboxesProvider = new SystemMailboxesProviderImpl(mailboxManager);
+ when(mailboxManager.createSystemSession(USER))
+ .thenReturn(MAILBOX_SESSION);
+ mapperFactory = mailboxManager.getMapperFactory();
+ MailboxMapper mailboxMapper = mapperFactory.createMailboxMapper(MAILBOX_SESSION);
+ inbox = mailboxMapper.create(MailboxPath.forUser(USER, DefaultMailboxes.INBOX), UID_VALIDITY).block();
+ mailbox1 = mailboxMapper.create(MailboxPath.forUser(USER, "mailbox1"), UID_VALIDITY).block();
+ mailbox2 = mailboxMapper.create(MailboxPath.forUser(USER, "mailbox2"), UID_VALIDITY).block();
+ mailboxId1 = mailbox1.getMailboxId();
+ mailboxId2 = mailbox2.getMailboxId();
+ spamMailboxId = mailboxMapper.create(MailboxPath.forUser(USER, "Spam"), UID_VALIDITY).block().getMailboxId();
+ spamCapitalMailboxId = mailboxMapper.create(MailboxPath.forUser(USER, "SPAM"), UID_VALIDITY).block().getMailboxId();
+ trashMailboxId = mailboxMapper.create(MailboxPath.forUser(USER, "Trash"), UID_VALIDITY).block().getMailboxId();
+
+ listener = new RSpamDListener(rSpamDHttpClient, mailboxManager, mapperFactory, systemMailboxesProvider);
+ }
+
+ @Test
+ void deserializeListenerGroup() throws Exception {
+ assertThat(Group.deserialize("org.apache.james.rspamd.RSpamDListener$RSpamDListenerGroup"))
+ .isEqualTo(new RSpamDListener.RSpamDListenerGroup());
+ }
+
+ @Test
+ void isEventOnSpamMailboxShouldReturnFalseWhenMessageIsMovedToANonSpamMailbox() {
+ MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+ .session(MAILBOX_SESSION)
+ .messageMoves(MessageMoves.builder()
+ .previousMailboxIds(mailboxId1)
+ .targetMailboxIds(mailboxId2)
+ .build())
+ .messageId(MESSAGE_ID)
+ .build();
+
+ assertThat(listener.isMessageMovedToSpamMailbox(messageMoveEvent).block()).isFalse();
+ }
+
+ @Test
+ void isEventOnSpamMailboxShouldReturnTrueWhenMailboxIsSpam() {
+ MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+ .session(MAILBOX_SESSION)
+ .messageMoves(MessageMoves.builder()
+ .previousMailboxIds(mailboxId1)
+ .targetMailboxIds(spamMailboxId)
+ .build())
+ .messageId(MESSAGE_ID)
+ .build();
+
+ assertThat(listener.isMessageMovedToSpamMailbox(messageMoveEvent).block()).isTrue();
+ }
+
+ @Test
+ void isEventOnSpamMailboxShouldReturnFalseWhenMailboxIsSpamOtherCase() {
+ MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+ .session(MAILBOX_SESSION)
+ .messageMoves(MessageMoves.builder()
+ .previousMailboxIds(mailboxId1)
+ .targetMailboxIds(spamCapitalMailboxId)
+ .build())
+ .messageId(MESSAGE_ID)
+ .build();
+
+ assertThat(listener.isMessageMovedToSpamMailbox(messageMoveEvent).block()).isFalse();
+ }
+
+ @Test
+ void isMessageMovedOutOfSpamMailboxShouldReturnFalseWhenMessageMovedBetweenNonSpamMailboxes() {
+ MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+ .session(MAILBOX_SESSION)
+ .messageMoves(MessageMoves.builder()
+ .previousMailboxIds(mailboxId1)
+ .targetMailboxIds(mailboxId2)
+ .build())
+ .messageId(MESSAGE_ID)
+ .build();
+
+ assertThat(listener.isMessageMovedOutOfSpamMailbox(messageMoveEvent).block()).isFalse();
+ }
+
+ @Test
+ void isMessageMovedOutOfSpamMailboxShouldReturnFalseWhenMessageMovedOutOfCapitalSpamMailbox() {
+ MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+ .session(MAILBOX_SESSION)
+ .messageMoves(MessageMoves.builder()
+ .previousMailboxIds(spamCapitalMailboxId)
+ .targetMailboxIds(mailboxId2)
+ .build())
+ .messageId(MESSAGE_ID)
+ .build();
+
+ assertThat(listener.isMessageMovedOutOfSpamMailbox(messageMoveEvent).block()).isFalse();
+ }
+
+ @Test
+ void isMessageMovedOutOfSpamMailboxShouldReturnTrueWhenMessageMovedOutOfSpamMailbox() {
+ MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+ .session(MAILBOX_SESSION)
+ .messageMoves(MessageMoves.builder()
+ .previousMailboxIds(spamMailboxId)
+ .targetMailboxIds(mailboxId2)
+ .build())
+ .messageId(MESSAGE_ID)
+ .build();
+
+ assertThat(listener.isMessageMovedOutOfSpamMailbox(messageMoveEvent).block()).isTrue();
+ }
+
+ @Test
+ void isMessageMovedOutOfSpamMailboxShouldReturnFalseWhenMessageMovedToTrash() {
+ MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+ .session(MAILBOX_SESSION)
+ .messageMoves(MessageMoves.builder()
+ .previousMailboxIds(spamMailboxId)
+ .targetMailboxIds(trashMailboxId)
+ .build())
+ .messageId(MESSAGE_ID)
+ .build();
+
+ assertThat(listener.isMessageMovedOutOfSpamMailbox(messageMoveEvent).block()).isFalse();
+ }
+
+
+ @Test
+ void eventShouldCallReportSpamLearningWhenTheMovedEventMatches() throws Exception {
+ createMessage(inbox);
+ MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+ .session(MAILBOX_SESSION)
+ .messageMoves(MessageMoves.builder()
+ .previousMailboxIds(mailboxId1)
+ .targetMailboxIds(spamMailboxId)
+ .build())
+ .messageId(MESSAGE_ID)
+ .build();
+
+ listener.event(messageMoveEvent);
+
+ verify(rSpamDHttpClient).reportAsSpam(any());
+ }
+
+ @Test
+ void eventShouldCallHamLearningWhenTheMovedEventMatches() throws Exception {
+ createMessage(inbox);
+
+ MessageMoveEvent messageMoveEvent = MessageMoveEvent.builder()
+ .session(MAILBOX_SESSION)
+ .messageMoves(MessageMoves.builder()
+ .previousMailboxIds(spamMailboxId)
+ .targetMailboxIds(mailboxId1)
+ .build())
+ .messageId(MESSAGE_ID)
+ .build();
+
+ listener.event(messageMoveEvent);
+
+ verify(rSpamDHttpClient).reportAsHam(any());
+ }
+
+ @Test
+ void eventShouldCallReportHamLearningWhenTheMessageIsAddedInInbox() throws Exception {
+ SimpleMailboxMessage message = createMessage(inbox);
+
+ MailboxEvents.Added addedEvent = EventFactory.added()
+ .randomEventId()
+ .mailboxSession(MAILBOX_SESSION)
+ .mailbox(inbox)
+ .addMetaData(message.metaData())
+ .build();
+
+ listener.event(addedEvent);
+
+ verify(rSpamDHttpClient).reportAsHam(any());
+ }
+
+ @Test
+ void eventShouldNotCallReportHamLearningWhenTheMessageIsAddedInAMailboxOtherThanInbox() throws Exception {
+ SimpleMailboxMessage message = createMessage(mailbox1);
+
+ MailboxEvents.Added addedEvent = EventFactory.added()
+ .randomEventId()
+ .mailboxSession(MAILBOX_SESSION)
+ .mailbox(mailbox1)
+ .addMetaData(message.metaData())
+ .build();
+
+ listener.event(addedEvent);
+
+ verify(rSpamDHttpClient, never()).reportAsHam(any());
+ }
+
+ private SimpleMailboxMessage createMessage(Mailbox mailbox) throws MailboxException {
+ int size = 45;
+ int bodyStartOctet = 25;
+ byte[] content = "Subject: test\r\n\r\nBody\r\n".getBytes(StandardCharsets.UTF_8);
+ SimpleMailboxMessage message = new SimpleMailboxMessage(MESSAGE_ID, THREAD_ID, new Date(),
+ size, bodyStartOctet, new ByteContent(content), new Flags(), new PropertyBuilder().build(),
+ mailbox.getMailboxId());
+ MessageMetaData messageMetaData = mapperFactory.createMessageMapper(null).add(mailbox, message);
+ message.setUid(messageMetaData.getUid());
+ return message;
+ }
+}