blob: 13db251637bfab033e60e78d8f1fd27ee7fa805e [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.queue.rabbitmq;
import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX;
import java.time.Clock;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
import org.apache.mailet.Mail;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;
class Enqueuer {
private final MailQueueName name;
private final Sender sender;
private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
private final MailReferenceSerializer mailReferenceSerializer;
private final Metric enqueueMetric;
private final MailQueueView mailQueueView;
private final Clock clock;
Enqueuer(MailQueueName name, Sender sender, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore,
MailReferenceSerializer serializer, MetricFactory metricFactory,
MailQueueView mailQueueView, Clock clock) {
this.name = name;
this.sender = sender;
this.mimeMessageStore = mimeMessageStore;
this.mailReferenceSerializer = serializer;
this.mailQueueView = mailQueueView;
this.clock = clock;
this.enqueueMetric = metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString());
}
void enQueue(Mail mail) throws MailQueue.MailQueueException {
EnqueueId enqueueId = EnqueueId.generate();
saveMail(mail)
.map(partIds -> new MailReference(enqueueId, mail, partIds))
.flatMap(Throwing.<MailReference, Mono<Void>>function(mailReference -> {
EnqueuedItem enqueuedItem = toEnqueuedItems(mailReference);
return Flux.mergeDelayError(2,
mailQueueView.storeMail(enqueuedItem),
publishReferenceToRabbit(mailReference))
.then();
}).sneakyThrow())
.thenEmpty(Mono.fromRunnable(enqueueMetric::increment))
.block();
}
Mono<Void> reQueue(CassandraMailQueueBrowser.CassandraMailQueueItemView item) {
Mail mail = item.getMail();
return Mono.fromCallable(() -> new MailReference(item.getEnqueuedId(), mail, item.getEnqueuedPartsId()))
.flatMap(Throwing.function(this::publishReferenceToRabbit).sneakyThrow())
.then();
}
private Mono<MimeMessagePartsId> saveMail(Mail mail) throws MailQueue.MailQueueException {
try {
return mimeMessageStore.save(mail.getMessage());
} catch (MessagingException e) {
throw new MailQueue.MailQueueException("Error while saving blob", e);
}
}
private Mono<Void> publishReferenceToRabbit(MailReference mailReference) throws MailQueue.MailQueueException {
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
.priority(PERSISTENT_TEXT_PLAIN.getPriority())
.contentType(PERSISTENT_TEXT_PLAIN.getContentType())
.headers(ImmutableMap.of("x-dead-letter-routing-key", EMPTY_ROUTING_KEY))
.build();
OutboundMessage data = new OutboundMessage(
name.toRabbitExchangeName().asString(),
EMPTY_ROUTING_KEY,
basicProperties,
getMailReferenceBytes(mailReference));
return sender.send(Mono.just(data));
}
private EnqueuedItem toEnqueuedItems(MailReference mailReference) {
return EnqueuedItem.builder()
.enqueueId(mailReference.getEnqueueId())
.mailQueueName(name)
.mail(mailReference.getMail())
.enqueuedTime(clock.instant())
.mimeMessagePartsId(mailReference.getPartsId())
.build();
}
private byte[] getMailReferenceBytes(MailReference mailReference) throws MailQueue.MailQueueException {
try {
MailReferenceDTO mailDTO = MailReferenceDTO.fromMailReference(mailReference);
return mailReferenceSerializer.write(mailDTO);
} catch (JsonProcessingException e) {
throw new MailQueue.MailQueueException("Unable to serialize message", e);
}
}
}