blob: 1cfe96d38531f9e9b852131ff0f46312690b8185 [file] [log] [blame]
/**
* *************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
***************************************************************/
package org.apache.james.task.eventsourcing.distributed;
import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Optional;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.task.Task;
import org.apache.james.task.TaskId;
import org.apache.james.task.TaskManagerWorker;
import org.apache.james.task.TaskWithId;
import org.apache.james.task.WorkQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Delivery;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.Sender;
import reactor.util.retry.Retry;
public class RabbitMQWorkQueue implements WorkQueue {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueue.class);
static final String EXCHANGE_NAME = "taskManagerWorkQueueExchange";
static final String QUEUE_NAME = "taskManagerWorkQueue";
static final String ROUTING_KEY = "taskManagerWorkQueueRoutingKey";
static final String CANCEL_REQUESTS_EXCHANGE_NAME = "taskManagerCancelRequestsExchange";
static final String CANCEL_REQUESTS_ROUTING_KEY = "taskManagerCancelRequestsRoutingKey";
public static final String TASK_ID = "taskId";
public static final int NUM_RETRIES = 8;
public static final Duration FIRST_BACKOFF = Duration.ofMillis(100);
public static final boolean AUTO_DELETE_QUEUE = true;
private final TaskManagerWorker worker;
private final JsonTaskSerializer taskSerializer;
private final RabbitMQWorkQueueConfiguration configuration;
private final Sender sender;
private final ReceiverProvider receiverProvider;
private final RabbitMQConfiguration rabbitMQConfiguration;
private final CancelRequestQueueName cancelRequestQueueName;
private UnicastProcessor<TaskId> sendCancelRequestsQueue;
private Disposable sendCancelRequestsQueueHandle;
private Disposable receiverHandle;
private Disposable cancelRequestListenerHandle;
public RabbitMQWorkQueue(TaskManagerWorker worker, Sender sender,
ReceiverProvider receiverProvider, JsonTaskSerializer taskSerializer,
RabbitMQWorkQueueConfiguration configuration, CancelRequestQueueName cancelRequestQueueName,
RabbitMQConfiguration rabbitMQConfiguration) {
this.cancelRequestQueueName = cancelRequestQueueName;
this.worker = worker;
this.receiverProvider = receiverProvider;
this.sender = sender;
this.taskSerializer = taskSerializer;
this.configuration = configuration;
this.rabbitMQConfiguration = rabbitMQConfiguration;
}
@Override
public void start() {
startWorkqueue();
listenToCancelRequests();
}
private void startWorkqueue() {
declareQueue();
if (configuration.enabled()) {
consumeWorkqueue();
}
}
@VisibleForTesting
void declareQueue() {
Mono<AMQP.Exchange.DeclareOk> declareExchange = sender
.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME))
.retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF));
Mono<AMQP.Queue.DeclareOk> declareQueue = sender
.declare(QueueSpecification.queue(QUEUE_NAME)
.durable(true)
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(!AUTO_DELETE_QUEUE)
.singleActiveConsumer()
.consumerTimeout(rabbitMQConfiguration.getTaskQueueConsumerTimeout().toMillis())
.build()))
.retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF));
Mono<AMQP.Queue.BindOk> bindQueueToExchange = sender
.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME))
.retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF));
declareExchange
.then(declareQueue)
.then(bindQueueToExchange)
.block();
}
@Override
public void restart() {
closeRabbitResources();
start();
}
private void consumeWorkqueue() {
receiverHandle = Flux.using(
receiverProvider::createReceiver,
receiver -> receiver.consumeManualAck(QUEUE_NAME, new ConsumeOptions()),
Receiver::close)
.subscribeOn(Schedulers.elastic())
.concatMap(this::executeTask)
.subscribe();
}
private Mono<Task.Result> executeTask(AcknowledgableDelivery delivery) {
return Mono.fromCallable(() -> delivery.getProperties().getHeaders())
.map(headers -> headers.get(TASK_ID))
.map(taskIdValue -> TaskId.fromString(taskIdValue.toString()))
.flatMap(taskId -> Mono.fromCallable(() -> new String(delivery.getBody(), StandardCharsets.UTF_8))
.flatMap(bodyValue -> deserialize(bodyValue, taskId))
.doOnNext(task -> delivery.ack())
.flatMap(task -> executeOnWorker(taskId, task)))
.onErrorResume(error -> {
Optional<Object> taskId = Optional.ofNullable(delivery.getProperties())
.flatMap(props -> Optional.ofNullable(props.getHeaders()))
.flatMap(headers -> Optional.ofNullable(headers.get(TASK_ID)));
LOGGER.error("Unable to process {} {}", TASK_ID, taskId, error);
delivery.nack(!REQUEUE);
return Mono.empty();
});
}
private Mono<Task> deserialize(String json, TaskId taskId) {
return Mono.fromCallable(() -> taskSerializer.deserialize(json))
.onErrorResume(error -> {
String errorMessage = String.format("Unable to deserialize submitted Task %s", taskId.asString());
LOGGER.error(errorMessage, error);
return Mono.from(worker.fail(taskId, Optional.empty(), errorMessage, error))
.then(Mono.empty());
});
}
private Mono<Task.Result> executeOnWorker(TaskId taskId, Task task) {
return worker.executeTask(new TaskWithId(taskId, task))
.onErrorResume(error -> {
String errorMessage = String.format("Unable to run submitted Task %s", taskId.asString());
LOGGER.warn(errorMessage, error);
return Mono.from(worker.fail(taskId, task.details(), errorMessage, error))
.then(Mono.empty());
});
}
private void listenToCancelRequests() {
sender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
sender.declare(QueueSpecification.queue(cancelRequestQueueName.asString()).durable(!DURABLE).autoDelete(AUTO_DELETE)).block();
sender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, cancelRequestQueueName.asString())).block();
registerCancelRequestsListener(cancelRequestQueueName.asString());
sendCancelRequestsQueue = UnicastProcessor.create();
sendCancelRequestsQueueHandle = sender
.send(sendCancelRequestsQueue.map(this::makeCancelRequestMessage))
.subscribeOn(Schedulers.elastic())
.subscribe();
}
private void registerCancelRequestsListener(String queueName) {
cancelRequestListenerHandle = Flux.using(
receiverProvider::createReceiver,
receiver -> receiver.consumeAutoAck(queueName),
Receiver::close)
.subscribeOn(Schedulers.elastic())
.map(this::readCancelRequestMessage)
.doOnNext(worker::cancelTask)
.subscribe();
}
private TaskId readCancelRequestMessage(Delivery delivery) {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
return TaskId.fromString(message);
}
private OutboundMessage makeCancelRequestMessage(TaskId taskId) {
byte[] payload = taskId.asString().getBytes(StandardCharsets.UTF_8);
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().build();
return new OutboundMessage(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, basicProperties, payload);
}
@Override
public void submit(TaskWithId taskWithId) {
try {
byte[] payload = taskSerializer.serialize(taskWithId.getTask()).getBytes(StandardCharsets.UTF_8);
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
.priority(PERSISTENT_TEXT_PLAIN.getPriority())
.contentType(PERSISTENT_TEXT_PLAIN.getContentType())
.headers(ImmutableMap.of(TASK_ID, taskWithId.getId().asString()))
.build();
OutboundMessage outboundMessage = new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, basicProperties, payload);
sender.send(Mono.just(outboundMessage)).block();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@Override
public void cancel(TaskId taskId) {
sendCancelRequestsQueue.onNext(taskId);
}
@Override
public void close() {
closeRabbitResources();
}
private void closeRabbitResources() {
Optional.ofNullable(receiverHandle).ifPresent(Disposable::dispose);
Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose);
Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose);
}
}