| /**************************************************************** |
| * Licensed to the Apache Software Foundation (ASF) under one * |
| * or more contributor license agreements. See the NOTICE file * |
| * distributed with this work for additional information * |
| * regarding copyright ownership. The ASF licenses this file * |
| * to you under the Apache License, Version 2.0 (the * |
| * "License"); you may not use this file except in compliance * |
| * with the License. You may obtain a copy of the License at * |
| * * |
| * http://www.apache.org/licenses/LICENSE-2.0 * |
| * * |
| * Unless required by applicable law or agreed to in writing, * |
| * software distributed under the License is distributed on an * |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * |
| * KIND, either express or implied. See the License for the * |
| * specific language governing permissions and limitations * |
| * under the License. * |
| ****************************************************************/ |
| |
| package org.apache.james.queue.rabbitmq; |
| |
| import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX; |
| |
| import java.io.Closeable; |
| import java.util.function.Consumer; |
| |
| import org.apache.james.backends.rabbitmq.ReceiverProvider; |
| import org.apache.james.blob.api.ObjectNotFoundException; |
| import org.apache.james.metrics.api.Metric; |
| import org.apache.james.metrics.api.MetricFactory; |
| import org.apache.james.queue.api.MailQueue; |
| import org.apache.james.queue.api.MailQueueFactory; |
| import org.apache.james.queue.rabbitmq.view.api.DeleteCondition; |
| import org.apache.james.queue.rabbitmq.view.api.MailQueueView; |
| import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser; |
| import org.apache.mailet.Mail; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.github.fge.lambdas.Throwing; |
| import com.github.fge.lambdas.consumers.ThrowingConsumer; |
| |
| import reactor.core.publisher.Flux; |
| import reactor.core.publisher.Mono; |
| import reactor.rabbitmq.AcknowledgableDelivery; |
| import reactor.rabbitmq.ConsumeOptions; |
| import reactor.rabbitmq.Receiver; |
| |
| class Dequeuer implements Closeable { |
| private static final Logger LOGGER = LoggerFactory.getLogger(Dequeuer.class); |
| private static final boolean REQUEUE = true; |
| |
| private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem { |
| |
| private final Consumer<Boolean> ack; |
| private final EnqueueId enqueueId; |
| private final Mail mail; |
| |
| private RabbitMQMailQueueItem(Consumer<Boolean> ack, MailWithEnqueueId mailWithEnqueueId) { |
| this.ack = ack; |
| this.enqueueId = mailWithEnqueueId.getEnqueueId(); |
| this.mail = mailWithEnqueueId.getMail(); |
| } |
| |
| @Override |
| public Mail getMail() { |
| return mail; |
| } |
| |
| public EnqueueId getEnqueueId() { |
| return enqueueId; |
| } |
| |
| @Override |
| public void done(boolean success) { |
| ack.accept(success); |
| } |
| |
| } |
| |
| private final MailLoader mailLoader; |
| private final Metric dequeueMetric; |
| private final MailReferenceSerializer mailReferenceSerializer; |
| private final MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView; |
| private final Receiver receiver; |
| private final Flux<AcknowledgableDelivery> flux; |
| |
| Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, MailLoader mailLoader, |
| MailReferenceSerializer serializer, MetricFactory metricFactory, |
| MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView, MailQueueFactory.PrefetchCount prefetchCount) { |
| this.mailLoader = mailLoader; |
| this.mailReferenceSerializer = serializer; |
| this.mailQueueView = mailQueueView; |
| this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString()); |
| this.receiver = receiverProvider.createReceiver(); |
| this.flux = this.receiver |
| .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(prefetchCount.asInt())) |
| .filter(getResponse -> getResponse.getBody() != null); |
| } |
| |
| @Override |
| public void close() { |
| receiver.close(); |
| } |
| |
| Flux<? extends MailQueue.MailQueueItem> deQueue() { |
| return flux.flatMapSequential(this::loadItem) |
| .concatMap(this::filterIfDeleted); |
| } |
| |
| private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem item) { |
| return mailQueueView.isPresent(item.getEnqueueId()) |
| .handle((isPresent, sink) -> { |
| if (isPresent) { |
| sink.next(item); |
| } else { |
| item.done(true); |
| sink.complete(); |
| } |
| }); |
| } |
| |
| private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery response) { |
| return loadMail(response) |
| .map(mailWithEnqueueId -> new RabbitMQMailQueueItem(ack(response, mailWithEnqueueId), mailWithEnqueueId)); |
| } |
| |
| private ThrowingConsumer<Boolean> ack(AcknowledgableDelivery response, MailWithEnqueueId mailWithEnqueueId) { |
| return success -> { |
| if (success) { |
| dequeueMetric.increment(); |
| response.ack(); |
| mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(), mailWithEnqueueId.getBlobIds())); |
| } else { |
| response.nack(REQUEUE); |
| } |
| }; |
| } |
| |
| private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery delivery) { |
| return toMailReference(delivery) |
| .flatMap(reference -> mailLoader.load(reference) |
| .onErrorResume(ObjectNotFoundException.class, e -> { |
| LOGGER.error("Fail to load mail {} with enqueueId {} as underlying blobs do not exist. Discarding this message to prevent an infinite loop.", reference.getName(), reference.getEnqueueId(), e); |
| delivery.nack(!REQUEUE); |
| return Mono.empty(); |
| }) |
| .onErrorResume(e -> { |
| LOGGER.error("Fail to load mail {} with enqueueId {}", reference.getName(), reference.getEnqueueId(), e); |
| delivery.nack(REQUEUE); |
| return Mono.empty(); |
| })); |
| } |
| |
| private Mono<MailReferenceDTO> toMailReference(AcknowledgableDelivery delivery) { |
| return Mono.fromCallable(delivery::getBody) |
| .map(Throwing.function(mailReferenceSerializer::read).sneakyThrow()) |
| .onErrorResume(e -> { |
| LOGGER.error("Fail to deserialize MailReferenceDTO. Discarding this message to prevent an infinite loop.", e); |
| delivery.nack(!REQUEUE); |
| return Mono.empty(); |
| }); |
| } |
| } |