blob: 5a242f3f2c93c96cc9e4d9dbb11c28b295943202 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.rspamd;
import java.io.InputStream;
import javax.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.core.Username;
import org.apache.james.events.Event;
import org.apache.james.events.EventListener;
import org.apache.james.events.Group;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageManager;
import org.apache.james.mailbox.Role;
import org.apache.james.mailbox.SystemMailboxesProvider;
import org.apache.james.mailbox.events.MailboxEvents;
import org.apache.james.mailbox.events.MessageMoveEvent;
import org.apache.james.mailbox.model.MessageRange;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.mailbox.store.event.SpamEventListener;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.rspamd.client.RSpamDHttpClient;
import org.apache.james.util.FunctionalUtils;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class RSpamDListener implements SpamEventListener, EventListener.ReactiveGroupEventListener {
public static class RSpamDListenerGroup extends Group {
}
private static final Logger LOGGER = LoggerFactory.getLogger(RSpamDListener.class);
private static final int LIMIT = 1;
private static final Group GROUP = new RSpamDListenerGroup();
private final RSpamDHttpClient rSpamDHttpClient;
private final MailboxManager mailboxManager;
private final MailboxSessionMapperFactory mapperFactory;
private final SystemMailboxesProvider systemMailboxesProvider;
@Inject
public RSpamDListener(RSpamDHttpClient rSpamDHttpClient, MailboxManager mailboxManager, MailboxSessionMapperFactory mapperFactory, SystemMailboxesProvider systemMailboxesProvider) {
this.rSpamDHttpClient = rSpamDHttpClient;
this.mailboxManager = mailboxManager;
this.mapperFactory = mapperFactory;
this.systemMailboxesProvider = systemMailboxesProvider;
}
@Override
public Group getDefaultGroup() {
return GROUP;
}
@Override
public boolean isHandling(Event event) {
return event instanceof MessageMoveEvent || event instanceof MailboxEvents.Added;
}
@Override
public Publisher<Void> reactiveEvent(Event event) {
if (event instanceof MessageMoveEvent) {
return handleMessageMoved((MessageMoveEvent) event);
} else if (event instanceof MailboxEvents.Added) {
return handleMessageAdded((MailboxEvents.Added) event);
}
return Mono.empty();
}
private Mono<Void> handleMessageAdded(MailboxEvents.Added addedEvent) {
return isAppendedToInbox(addedEvent)
.filter(FunctionalUtils.identityPredicate())
.doOnNext(isHam -> LOGGER.debug("Ham event detected, EventId = {}", addedEvent.getEventId().getId()))
.flatMap(any -> reportHamWhenAdded(addedEvent, mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))));
}
private Mono<Void> handleMessageMoved(MessageMoveEvent messageMoveEvent) {
return handleMessageMoved(mailboxMessagePublisher(messageMoveEvent), messageMoveEvent);
}
private Mono<Void> reportHamWhenAdded(MailboxEvents.Added addedEvent, MailboxSession session) {
return mapperFactory.getMailboxMapper(session)
.findMailboxById(addedEvent.getMailboxId())
.map(mailbox -> Pair.of(mailbox, mapperFactory.getMessageMapper(session)))
.flatMapMany(pair -> Flux.fromIterable(MessageRange.toRanges(addedEvent.getUids()))
.flatMap(range -> pair.getRight().findInMailboxReactive(pair.getLeft(), range, MessageMapper.FetchType.FULL, LIMIT)))
.map(Throwing.function(MailboxMessage::getFullContent))
.flatMap(rSpamDHttpClient::reportAsHam, ReactorUtils.DEFAULT_CONCURRENCY)
.then();
}
private Flux<InputStream> mailboxMessagePublisher(MessageMoveEvent messageMoveEvent) {
return Mono.fromCallable(() -> mapperFactory.getMessageIdMapper(mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))))
.flatMapMany(messageIdMapper -> messageIdMapper.findReactive(messageMoveEvent.getMessageIds(), MessageMapper.FetchType.FULL))
.map(Throwing.function(MailboxMessage::getFullContent));
}
private Mono<Void> handleMessageMoved(Flux<InputStream> mailboxMessagesPublisher, MessageMoveEvent messageMoveEvent) {
Mono<Boolean> reportHamIfNotSpamDetected = isMessageMovedOutOfSpamMailbox(messageMoveEvent)
.filter(FunctionalUtils.identityPredicate())
.doOnNext(isHam -> LOGGER.debug("Ham event detected, EventId = {}", messageMoveEvent.getEventId().getId()));
return isMessageMovedToSpamMailbox(messageMoveEvent)
.flatMap(isSpam -> {
if (isSpam) {
LOGGER.debug("Spam event detected, EventId = {}", messageMoveEvent.getEventId().getId());
return mailboxMessagesPublisher.flatMap(rSpamDHttpClient::reportAsSpam, ReactorUtils.DEFAULT_CONCURRENCY)
.then();
} else {
return reportHamIfNotSpamDetected
.flatMapMany(isHam -> mailboxMessagesPublisher.flatMap(rSpamDHttpClient::reportAsHam, ReactorUtils.DEFAULT_CONCURRENCY))
.then();
}
});
}
@VisibleForTesting
Mono<Boolean> isMessageMovedToSpamMailbox(MessageMoveEvent event) {
return isMessageMovedToMailbox(event, Role.SPAM);
}
@VisibleForTesting
Mono<Boolean> isMessageMovedOutOfSpamMailbox(MessageMoveEvent event) {
return isMessageMovedOutToMailbox(event, Role.SPAM)
.zipWith(isMessageMovedToMailbox(event, Role.TRASH))
.map(tuple -> tuple.getT1() && !tuple.getT2());
}
@VisibleForTesting
Mono<Boolean> isAppendedToInbox(MailboxEvents.Added addedEvent) {
return Flux.from(systemMailboxesProvider.getMailboxByRole(Role.INBOX, addedEvent.getUsername()))
.next()
.map(MessageManager::getId)
.map(mailboxId -> mailboxId.equals(addedEvent.getMailboxId()))
.onErrorResume(e -> {
LOGGER.warn("Could not resolve Inbox mailbox", e);
return Mono.just(false);
});
}
private Mono<Boolean> isMessageMovedToMailbox(MessageMoveEvent event, Role role) {
return Flux.from(systemMailboxesProvider.getMailboxByRole(role, event.getUsername()))
.next()
.map(MessageManager::getId)
.map(spamMailboxId -> event.getMessageMoves().addedMailboxIds().contains(spamMailboxId))
.onErrorResume(e -> {
LOGGER.warn("Could not resolve {} mailbox", role, e);
return Mono.just(false);
});
}
private Mono<Boolean> isMessageMovedOutToMailbox(MessageMoveEvent event, Role role) {
return Flux.from(systemMailboxesProvider.getMailboxByRole(role, event.getUsername()))
.next()
.map(MessageManager::getId)
.map(spamMailboxId -> event.getMessageMoves().removedMailboxIds().contains(spamMailboxId))
.onErrorResume(e -> {
LOGGER.warn("Could not resolve {} mailbox", role, e);
return Mono.just(false);
});
}
}