blob: 3fb639c26dac318157b1fe6f00b5fd88cfcd6d83 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.mailetcontainer.impl;
import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.mail.MessagingException;
import org.apache.commons.configuration2.HierarchicalConfiguration;
import org.apache.commons.configuration2.tree.ImmutableNode;
import org.apache.james.core.MailAddress;
import org.apache.james.lifecycle.api.Configurable;
import org.apache.james.lifecycle.api.Disposable;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.mailetcontainer.api.MailProcessor;
import org.apache.james.mailetcontainer.api.jmx.MailSpoolerMBean;
import org.apache.james.mailrepository.api.MailRepository;
import org.apache.james.mailrepository.api.MailRepositoryPath;
import org.apache.james.mailrepository.api.MailRepositoryStore;
import org.apache.james.mailrepository.api.MailRepositoryUrl;
import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.TimeMetric;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueue.MailQueueItem;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.mailet.Attribute;
import org.apache.mailet.AttributeName;
import org.apache.mailet.AttributeValue;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
* Manages the mail spool. This class is responsible for retrieving messages
* from the spool, directing messages to the appropriate processor, and removing
* them from the spool when processing is complete.
*/
public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMBean {
private static class Runner {
private final AtomicInteger processingActive = new AtomicInteger(0);
private final MetricFactory metricFactory;
private final MailProcessor mailProcessor;
private final MailRepository errorRepository;
private final reactor.core.Disposable disposable;
private final MailQueue queue;
private final Configuration configuration;
private final Scheduler scheduler;
private final Scheduler queueScheduler;
private Runner(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, MailProcessor mailProcessor,
MailRepository errorRepository, MailQueue queue, Configuration configuration) {
this.metricFactory = metricFactory;
this.mailProcessor = mailProcessor;
this.errorRepository = errorRepository;
this.queue = queue;
this.configuration = configuration;
scheduler = Schedulers.newBoundedElastic(configuration.getConcurrencyLevel() + 1, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"spooler");
queueScheduler = Schedulers.newBoundedElastic(1, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"queueScheduler");
this.disposable = run(queue);
gaugeRegistry.register(SPOOL_PROCESSING + ".inFlight",
processingActive::get);
}
private reactor.core.Disposable run(MailQueue queue) {
return Flux.from(queue.deQueue())
.flatMap(item -> handleOnQueueItem(item).subscribeOn(scheduler), configuration.getConcurrencyLevel())
.onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable))
.subscribeOn(queueScheduler)
.subscribe();
}
private Mono<Void> handleOnQueueItem(MailQueueItem queueItem) {
TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING);
return Mono.fromCallable(processingActive::incrementAndGet)
.flatMap(ignore -> processMail(queueItem))
.doOnSuccess(any -> timeMetric.stopAndPublish())
.doOnTerminate(processingActive::decrementAndGet);
}
private Mono<Void> processMail(MailQueueItem queueItem) {
return Mono
.using(
queueItem::getMail,
mail -> Mono.fromRunnable(() -> performProcessMail(queueItem, mail)),
LifecycleUtil::dispose);
}
private void performProcessMail(MailQueueItem queueItem, Mail mail) {
LOGGER.debug("==== Begin processing mail {} ====", mail.getName());
ImmutableList<MailAddress> originalRecipients = ImmutableList.copyOf(mail.getRecipients());
try {
mailProcessor.service(mail);
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread has been interrupted");
}
queueItem.done(MailQueueItem.CompletionStatus.SUCCESS);
} catch (Throwable e) {
handleError(queueItem, mail, originalRecipients, e);
} finally {
LOGGER.debug("==== End processing mail {} ====", mail.getName());
}
}
private void handleError(MailQueueItem queueItem, Mail mail, ImmutableList<MailAddress> originalRecipients, Throwable processingException) {
int failureCount = computeFailureCount(mail);
// Restore original recipients
queueItem.getMail().setRecipients(originalRecipients);
try {
if (failureCount > MAXIMUM_FAILURE_COUNT) {
LOGGER.error("Failed {} processing {} consecutive times. Abort. Mail is saved in {}", mail.getName(), failureCount, configuration.getErrorRepositoryURL().asString());
storeInErrorRepository(queueItem);
} else {
LOGGER.error("Failed {} processing {} consecutive times. Mail is requeued with increased failure count.", mail.getName(), failureCount, processingException);
reEnqueue(queueItem, failureCount);
}
} catch (Exception nestedE) {
LOGGER.error("Could not apply standard error handling for {}, defaulting to nack", mail.getName(), nestedE);
nack(queueItem, processingException);
}
}
private int computeFailureCount(Mail mail) {
Integer previousFailureCount = mail.getAttribute(MAIL_PROCESSING_ERROR_COUNT)
.flatMap(attribute -> attribute.getValue().valueAs(Integer.class))
.orElse(0);
return previousFailureCount + 1;
}
private void reEnqueue(MailQueueItem queueItem, int failureCount) throws MailQueue.MailQueueException {
Mail mail = queueItem.getMail();
mail.setAttribute(new Attribute(MAIL_PROCESSING_ERROR_COUNT, AttributeValue.of(failureCount)));
queue.enQueue(mail);
queueItem.done(MailQueueItem.CompletionStatus.SUCCESS);
}
private void storeInErrorRepository(MailQueueItem queueItem) throws MessagingException {
errorRepository.store(queueItem.getMail());
queueItem.done(MailQueueItem.CompletionStatus.SUCCESS);
}
private void nack(MailQueueItem queueItem, Throwable processingException) {
try {
queueItem.done(MailQueueItem.CompletionStatus.REJECT);
} catch (MailQueue.MailQueueException ex) {
throw new RuntimeException(processingException);
}
}
public void dispose() {
LOGGER.info("start dispose() ...");
LOGGER.info("Cancel queue consumption...");
queueScheduler.dispose();
LOGGER.info("Queue consumption canceled, shutting down processor threads...");
scheduler.disposeGracefully()
.timeout(Duration.ofSeconds(5))
.onErrorResume(e -> Mono.empty())
.block();
disposable.dispose();
LOGGER.info("Thread shutdown completed. Turning off mail queue.");
try {
queue.close();
} catch (IOException e) {
LOGGER.debug("error closing queue", e);
}
}
public int getCurrentSpoolCount() {
return processingActive.get();
}
}
public static class Configuration {
public static Configuration from(MailRepositoryStore mailRepositoryStore, HierarchicalConfiguration<ImmutableNode> config) {
int concurrencyLevel = config.getInt("threads", 100);
MailRepositoryUrl errorRepositoryURL = Optional.ofNullable(config.getString("errorRepository", null))
.map(MailRepositoryUrl::from)
.orElseGet(() -> MailRepositoryUrl.fromPathAndProtocol(
mailRepositoryStore.defaultProtocol()
.orElseThrow(() -> new IllegalStateException("Cannot retrieve mailRepository URL, you need to configure an `errorRepository` property for the spooler.0")),
ERROR_REPOSITORY_PATH));
return new Configuration(concurrencyLevel, errorRepositoryURL);
}
private final int concurrencyLevel;
private final MailRepositoryUrl errorRepositoryURL;
public Configuration(int concurrencyLevel, MailRepositoryUrl errorRepositoryURL) {
Preconditions.checkArgument(concurrencyLevel >= 0, "'threads' needs to be greater than or equal to zero");
this.concurrencyLevel = concurrencyLevel;
this.errorRepositoryURL = errorRepositoryURL;
}
public int getConcurrencyLevel() {
return concurrencyLevel;
}
public boolean isEnabled() {
return concurrencyLevel > 0;
}
public MailRepositoryUrl getErrorRepositoryURL() {
return errorRepositoryURL;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("concurrencyLevel", concurrencyLevel)
.add("errorRepositoryURL", errorRepositoryURL)
.toString();
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class);
public static final String SPOOL_PROCESSING = "spoolProcessing";
public static final AttributeName MAIL_PROCESSING_ERROR_COUNT = AttributeName.of("mail-processing-error-count");
public static final MailRepositoryPath ERROR_REPOSITORY_PATH = MailRepositoryPath.from("var/mail/error");
public static final int MAXIMUM_FAILURE_COUNT = 5;
/**
* concurrency level to use for dequeuing mails from spool, allows to throttle resources dedicated to that async
* process.
*/
private final MetricFactory metricFactory;
private final GaugeRegistry gaugeRegistry;
private final MailProcessor mailProcessor;
private final MailRepositoryStore mailRepositoryStore;
private final MailQueueFactory<?> queueFactory;
private Configuration configuration;
private Optional<Runner> runner;
private MailQueue queue;
@Inject
public JamesMailSpooler(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, MailProcessor mailProcessor, MailRepositoryStore mailRepositoryStore, MailQueueFactory<?> queueFactory) {
this.metricFactory = metricFactory;
this.gaugeRegistry = gaugeRegistry;
this.mailProcessor = mailProcessor;
this.mailRepositoryStore = mailRepositoryStore;
this.queueFactory = queueFactory;
}
public void restart() {
Optional<Runner> previous = runner;
runner = Optional.of(new Runner(metricFactory, gaugeRegistry, mailProcessor, errorRepository(), queue, configuration));
previous.ifPresent(Runner::dispose);
}
@Override
public void configure(HierarchicalConfiguration<ImmutableNode> config) {
configure(Configuration.from(mailRepositoryStore, config));
}
public void configure(Configuration configuration) {
this.configuration = configuration;
}
/**
* Initializes the spool manager.
*/
@PostConstruct
public void init() {
if (configuration.isEnabled()) {
LOGGER.info("init...");
LOGGER.info("Concurrency level is {}", configuration.getConcurrencyLevel());
queue = queueFactory.createQueue(MailQueueFactory.SPOOL, MailQueueFactory.prefetchCount(configuration.getConcurrencyLevel()));
runner = Optional.of(new Runner(metricFactory, gaugeRegistry,
mailProcessor, errorRepository(), queue, configuration));
LOGGER.info("Spooler started");
} else {
LOGGER.info("Spooler had been deactivated. To enable it set 'threads' count to a value greater than zero");
}
}
private MailRepository errorRepository() {
try {
return mailRepositoryStore.select(configuration.getErrorRepositoryURL());
} catch (MailRepositoryStore.MailRepositoryStoreException e) {
throw new RuntimeException(e);
}
}
/**
* The dispose operation is called at the end of a components lifecycle.
* Instances of this class use this method to release and destroy any
* resources that they own.
* <p/>
* This implementation shuts down the LinearProcessors managed by this
* JamesSpoolManager
*/
@PreDestroy
@Override
public void dispose() {
runner.ifPresent(Runner::dispose);
}
@Override
public int getThreadCount() {
return configuration.getConcurrencyLevel();
}
@Override
public int getCurrentSpoolCount() {
return runner.map(Runner::getCurrentSpoolCount).orElse(0);
}
}