blob: 9ae75a0c73a200a3756db84189047e6b6c2c9575 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.transport.mailets.remote.delivery;
import static org.apache.james.transport.mailets.remote.delivery.Bouncer.IS_DELIVERY_PERMANENT_ERROR;
import java.io.Closeable;
import java.time.Duration;
import java.util.Date;
import java.util.Optional;
import java.util.function.Supplier;
import jakarta.mail.MessagingException;
import jakarta.mail.internet.MimeMessage;
import org.apache.commons.lang3.StringUtils;
import org.apache.james.dnsservice.api.DNSService;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.TimeMetric;
import org.apache.james.queue.api.MailPrioritySupport;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.util.AuditTrail;
import org.apache.james.util.MDCBuilder;
import org.apache.mailet.Attribute;
import org.apache.mailet.AttributeValue;
import org.apache.mailet.Mail;
import org.apache.mailet.MailetContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
public class DeliveryRunnable implements Disposable {
private static final Logger LOGGER = LoggerFactory.getLogger(DeliveryRunnable.class);
public static final Supplier<Date> CURRENT_DATE_SUPPLIER = Date::new;
public static final String OUTGOING_MAILS = "outgoingMails";
public static final String REMOTE_DELIVERY_TRIAL = "RemoteDeliveryTrial";
private final MailQueue queue;
private final RemoteDeliveryConfiguration configuration;
private final Metric outgoingMailsMetric;
private final MetricFactory metricFactory;
private final Bouncer bouncer;
private final MailDelivrer mailDelivrer;
private final Supplier<Date> dateSupplier;
private final MailetContext mailetContext;
private Disposable disposable;
private Scheduler remoteDeliveryProcessScheduler;
private Scheduler remoteDeliveryDequeueScheduler;
public DeliveryRunnable(MailQueue queue, RemoteDeliveryConfiguration configuration, DNSService dnsServer, MetricFactory metricFactory,
MailetContext mailetContext, Bouncer bouncer) {
this(queue, configuration, metricFactory, bouncer,
new MailDelivrer(configuration, new MailDelivrerToHost(configuration, mailetContext), dnsServer, bouncer, mailetContext),
CURRENT_DATE_SUPPLIER, mailetContext);
}
@VisibleForTesting
DeliveryRunnable(MailQueue queue, RemoteDeliveryConfiguration configuration, MetricFactory metricFactory, Bouncer bouncer,
MailDelivrer mailDelivrer, Supplier<Date> dateSupplier, MailetContext mailetContext) {
this.queue = queue;
this.configuration = configuration;
this.outgoingMailsMetric = metricFactory.generate(OUTGOING_MAILS);
this.bouncer = bouncer;
this.mailDelivrer = mailDelivrer;
this.dateSupplier = dateSupplier;
this.metricFactory = metricFactory;
this.mailetContext = mailetContext;
}
public void start() {
remoteDeliveryProcessScheduler = Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "RemoteDelivery-Process");
remoteDeliveryDequeueScheduler = Schedulers.newSingle("RemoteDelivery-Dequeue");
disposable = Flux.from(queue.deQueue())
.flatMap(queueItem -> runStep(queueItem).subscribeOn(remoteDeliveryProcessScheduler), Queues.SMALL_BUFFER_SIZE)
.onErrorContinue(((throwable, nothing) -> LOGGER.error("Exception caught in RemoteDelivery", throwable)))
.subscribeOn(remoteDeliveryDequeueScheduler)
.subscribe();
}
private Mono<Void> runStep(MailQueue.MailQueueItem queueItem) {
TimeMetric timeMetric = metricFactory.timer(REMOTE_DELIVERY_TRIAL);
return processMail(queueItem)
.doOnSuccess(any -> timeMetric.stopAndPublish());
}
private Mono<Void> processMail(MailQueue.MailQueueItem queueItem) {
return Mono.create(sink -> {
Mail mail = queueItem.getMail();
try (Closeable closeable =
MDCBuilder.create()
.addToContext("mail", mail.getName())
.addToContext("recipients", ImmutableList.copyOf(mail.getRecipients()).toString())
.addToContext("sender", mail.getMaybeSender().asString())
.build()) {
LOGGER.debug("will process mail {}", mail.getName());
attemptDelivery(mail);
queueItem.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS);
sink.success();
} catch (Exception e) {
try {
// Prevent unexpected exceptions from causing looping by removing message from outgoing.
// DO NOT CHANGE THIS to catch Error!
// For example, if there were an OutOfMemory condition caused because
// something else in the server was abusing memory, we would not want to start purging the retrying spool!
AuditTrail.entry()
.protocol("mailetcontainer")
.action("RemoteDelivery")
.parameters(Throwing.supplier(() -> ImmutableMap.of("mailId", mail.getName(),
"mimeMessageId", Optional.ofNullable(mail.getMessage())
.map(Throwing.function(MimeMessage::getMessageID))
.orElse(""),
"sender", mail.getMaybeSender().asString(),
"recipients", StringUtils.join(mail.getRecipients()))))
.log("Remote delivering mail failed temporarily.");
queueItem.done(MailQueue.MailQueueItem.CompletionStatus.RETRY);
} catch (Exception ex) {
sink.error(ex);
return;
}
sink.error(e);
} finally {
LifecycleUtil.dispose(mail);
}
});
}
@VisibleForTesting
void attemptDelivery(Mail mail) throws MessagingException {
ExecutionResult executionResult = mailDelivrer.deliver(mail);
switch (executionResult.getExecutionState()) {
case SUCCESS:
outgoingMailsMetric.increment();
configuration.getOnSuccess()
.ifPresent(Throwing.consumer(onSuccess -> mailetContext.sendMail(mail, onSuccess.getValue())));
AuditTrail.entry()
.protocol("mailetcontainer")
.action("RemoteDelivery")
.parameters(Throwing.supplier(() -> ImmutableMap.of("mailId", mail.getName(),
"mimeMessageId", Optional.ofNullable(mail.getMessage())
.map(Throwing.function(MimeMessage::getMessageID))
.orElse(""),
"sender", mail.getMaybeSender().asString(),
"recipients", StringUtils.join(mail.getRecipients()))))
.log("Remote delivering mail succeeded.");
break;
case TEMPORARY_FAILURE:
AuditTrail.entry()
.protocol("mailetcontainer")
.action("RemoteDelivery")
.parameters(Throwing.supplier(() -> ImmutableMap.of("mailId", mail.getName(),
"mimeMessageId", Optional.ofNullable(mail.getMessage())
.map(Throwing.function(MimeMessage::getMessageID))
.orElse(""),
"sender", mail.getMaybeSender().asString(),
"recipients", StringUtils.join(mail.getRecipients()))))
.log("Remote delivering mail failed temporarily.");
handleTemporaryFailure(mail, executionResult);
break;
case PERMANENT_FAILURE:
AuditTrail.entry()
.protocol("mailetcontainer")
.action("RemoteDelivery")
.parameters(Throwing.supplier(() -> ImmutableMap.of("mailId", mail.getName(),
"mimeMessageId", Optional.ofNullable(mail.getMessage())
.map(Throwing.function(MimeMessage::getMessageID))
.orElse(""),
"sender", mail.getMaybeSender().asString(),
"recipients", StringUtils.join(mail.getRecipients()))))
.log("Remote delivering mail failed permanently.");
handlePermanentFailure(mail, executionResult);
break;
}
}
private void handlePermanentFailure(Mail mail, ExecutionResult executionResult) throws MessagingException {
mail.setAttribute(new Attribute(IS_DELIVERY_PERMANENT_ERROR, AttributeValue.of(true)));
bouncer.bounce(mail, executionResult.getException().orElse(null));
}
private void handleTemporaryFailure(Mail mail, ExecutionResult executionResult) throws MessagingException {
if (!mail.getState().equals(Mail.ERROR)) {
mail.setState(Mail.ERROR);
DeliveryRetriesHelper.initRetries(mail);
mail.setLastUpdated(dateSupplier.get());
}
mail.setAttribute(new Attribute(IS_DELIVERY_PERMANENT_ERROR, AttributeValue.of(false)));
int retries = DeliveryRetriesHelper.retrieveRetries(mail);
if (retries < configuration.getMaxRetries()) {
reAttemptDelivery(mail, retries);
} else {
LOGGER.debug("Bouncing message {} after {} retries", mail.getName(), retries);
bouncer.bounce(mail, new Exception("Too many retries failure. Bouncing after " + retries + " retries.", executionResult.getException().orElse(null)));
AuditTrail.entry()
.protocol("mailetcontainer")
.action("RemoteDelivery")
.parameters(Throwing.supplier(() -> ImmutableMap.of("mailId", mail.getName(),
"mimeMessageId", Optional.ofNullable(mail.getMessage())
.map(Throwing.function(MimeMessage::getMessageID))
.orElse(""),
"sender", mail.getMaybeSender().asString(),
"recipients", StringUtils.join(mail.getRecipients()))))
.log("Remote delivering mail failed after maximum retries.");
}
}
private void reAttemptDelivery(Mail mail, int retries) throws MailQueue.MailQueueException {
LOGGER.debug("Storing message {} into outgoing after {} retries", mail.getName(), retries);
DeliveryRetriesHelper.incrementRetries(mail);
mail.setLastUpdated(dateSupplier.get());
// Something happened that will delay delivery. Store it back in the retry repository.
Duration delay = getNextDelay(DeliveryRetriesHelper.retrieveRetries(mail));
if (configuration.isUsePriority()) {
// Use lowest priority for retries. See JAMES-1311
mail.setAttribute(MailPrioritySupport.LOW_PRIORITY_ATTRIBUTE);
}
queue.enQueue(mail, delay);
}
private Duration getNextDelay(int retryCount) {
if (retryCount > configuration.getDelayTimes().size()) {
return Delay.DEFAULT_DELAY_TIME;
}
return configuration.getDelayTimes().get(retryCount - 1);
}
@Override
public void dispose() {
disposable.dispose();
remoteDeliveryDequeueScheduler.dispose();
remoteDeliveryProcessScheduler.disposeGracefully()
.timeout(Duration.ofSeconds(2))
.onErrorResume(e -> Mono.empty())
.block();
}
}