| /** |
| * ************************************************************* |
| * Licensed to the Apache Software Foundation (ASF) under one * |
| * or more contributor license agreements. See the NOTICE file * |
| * distributed with this work for additional information * |
| * regarding copyright ownership. The ASF licenses this file * |
| * to you under the Apache License, Version 2.0 (the * |
| * "License"); you may not use this file except in compliance * |
| * with the License. You may obtain a copy of the License at * |
| * * |
| * http://www.apache.org/licenses/LICENSE-2.0 * |
| * * |
| * Unless required by applicable law or agreed to in writing, * |
| * software distributed under the License is distributed on an * |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * |
| * KIND, either express or implied. See the License for the * |
| * specific language governing permissions and limitations * |
| * under the License. * |
| ***************************************************************/ |
| |
| package org.apache.james.task.eventsourcing.distributed; |
| |
| import java.nio.charset.StandardCharsets; |
| import java.util.Optional; |
| import java.util.UUID; |
| |
| import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; |
| import org.apache.james.backends.rabbitmq.SimpleConnectionPool; |
| import org.apache.james.lifecycle.api.Startable; |
| import org.apache.james.server.task.json.JsonTaskSerializer; |
| import org.apache.james.task.Task; |
| import org.apache.james.task.TaskId; |
| import org.apache.james.task.TaskManagerWorker; |
| import org.apache.james.task.TaskWithId; |
| import org.apache.james.task.WorkQueue; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.google.common.collect.ImmutableMap; |
| import com.rabbitmq.client.AMQP; |
| import com.rabbitmq.client.Connection; |
| import com.rabbitmq.client.Delivery; |
| import reactor.core.Disposable; |
| import reactor.core.publisher.Mono; |
| import reactor.core.publisher.UnicastProcessor; |
| import reactor.core.scheduler.Schedulers; |
| import reactor.rabbitmq.AcknowledgableDelivery; |
| import reactor.rabbitmq.BindingSpecification; |
| import reactor.rabbitmq.ConsumeOptions; |
| import reactor.rabbitmq.ExchangeSpecification; |
| import reactor.rabbitmq.OutboundMessage; |
| import reactor.rabbitmq.QueueSpecification; |
| import reactor.rabbitmq.RabbitFlux; |
| import reactor.rabbitmq.Receiver; |
| import reactor.rabbitmq.ReceiverOptions; |
| import reactor.rabbitmq.Sender; |
| |
| public class RabbitMQWorkQueue implements WorkQueue, Startable { |
| private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQWorkQueue.class); |
| |
| // Need at least one by receivers plus a shared one for senders |
| static final Integer MAX_CHANNELS_NUMBER = 5; |
| static final String EXCHANGE_NAME = "taskManagerWorkQueueExchange"; |
| static final String QUEUE_NAME = "taskManagerWorkQueue"; |
| static final String ROUTING_KEY = "taskManagerWorkQueueRoutingKey"; |
| |
| static final String CANCEL_REQUESTS_EXCHANGE_NAME = "taskManagerCancelRequestsExchange"; |
| static final String CANCEL_REQUESTS_ROUTING_KEY = "taskManagerCancelRequestsRoutingKey"; |
| private static final String CANCEL_REQUESTS_QUEUE_NAME_PREFIX = "taskManagerCancelRequestsQueue"; |
| public static final String TASK_ID = "taskId"; |
| |
| private final TaskManagerWorker worker; |
| private final Mono<Connection> connectionMono; |
| private final ReactorRabbitMQChannelPool channelPool; |
| private final JsonTaskSerializer taskSerializer; |
| private Sender sender; |
| private RabbitMQExclusiveConsumer receiver; |
| private UnicastProcessor<TaskId> sendCancelRequestsQueue; |
| private Disposable sendCancelRequestsQueueHandle; |
| private Disposable receiverHandle; |
| private Disposable cancelRequestListenerHandle; |
| private Sender cancelRequestSender; |
| private Receiver cancelRequestListener; |
| |
| public RabbitMQWorkQueue(TaskManagerWorker worker, SimpleConnectionPool simpleConnectionPool, JsonTaskSerializer taskSerializer) { |
| this.worker = worker; |
| this.connectionMono = simpleConnectionPool.getResilientConnection(); |
| this.taskSerializer = taskSerializer; |
| this.channelPool = new ReactorRabbitMQChannelPool(connectionMono, MAX_CHANNELS_NUMBER); |
| } |
| |
| public void start() { |
| startWorkqueue(); |
| listenToCancelRequests(); |
| } |
| |
| private void startWorkqueue() { |
| sender = channelPool.createSender(); |
| sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block(); |
| sender.declare(QueueSpecification.queue(QUEUE_NAME).durable(true)).block(); |
| sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)).block(); |
| |
| consumeWorkqueue(); |
| } |
| |
| private void consumeWorkqueue() { |
| receiver = new RabbitMQExclusiveConsumer(new ReceiverOptions().connectionMono(connectionMono)); |
| receiverHandle = receiver.consumeExclusiveManualAck(QUEUE_NAME, new ConsumeOptions()) |
| .subscribeOn(Schedulers.boundedElastic()) |
| .flatMap(this::executeTask) |
| .subscribe(); |
| } |
| |
| private Mono<Task.Result> executeTask(AcknowledgableDelivery delivery) { |
| delivery.ack(); |
| String json = new String(delivery.getBody(), StandardCharsets.UTF_8); |
| |
| TaskId taskId = TaskId.fromString(delivery.getProperties().getHeaders().get(TASK_ID).toString()); |
| |
| return deserialize(json, taskId) |
| .flatMap(task -> executeOnWorker(taskId, task)); |
| } |
| |
| private Mono<Task> deserialize(String json, TaskId taskId) { |
| return Mono.fromCallable(() -> taskSerializer.deserialize(json)) |
| .doOnError(error -> { |
| String errorMessage = String.format("Unable to deserialize submitted Task %s", taskId.asString()); |
| LOGGER.error(errorMessage, error); |
| worker.fail(taskId, Optional.empty(), errorMessage, error); |
| }) |
| .onErrorResume(error -> Mono.empty()); |
| } |
| |
| private Mono<Task.Result> executeOnWorker(TaskId taskId, Task task) { |
| return worker.executeTask(new TaskWithId(taskId, task)) |
| .doOnError(error -> { |
| String errorMessage = String.format("Unable to run submitted Task %s", taskId.asString()); |
| LOGGER.warn(errorMessage, error); |
| worker.fail(taskId, task.details(), errorMessage, error); |
| }) |
| .onErrorResume(error -> Mono.empty()); |
| } |
| |
| void listenToCancelRequests() { |
| cancelRequestSender = channelPool.createSender(); |
| String queueName = CANCEL_REQUESTS_QUEUE_NAME_PREFIX + UUID.randomUUID().toString(); |
| |
| cancelRequestSender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block(); |
| cancelRequestSender.declare(QueueSpecification.queue(queueName).durable(false).autoDelete(true)).block(); |
| cancelRequestSender.bind(BindingSpecification.binding(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, queueName)).block(); |
| registerCancelRequestsListener(queueName); |
| |
| sendCancelRequestsQueue = UnicastProcessor.create(); |
| sendCancelRequestsQueueHandle = cancelRequestSender |
| .send(sendCancelRequestsQueue.map(this::makeCancelRequestMessage)) |
| .subscribeOn(Schedulers.boundedElastic()) |
| .subscribe(); |
| } |
| |
| private void registerCancelRequestsListener(String queueName) { |
| cancelRequestListener = RabbitFlux |
| .createReceiver(new ReceiverOptions().connectionMono(connectionMono)); |
| cancelRequestListenerHandle = cancelRequestListener |
| .consumeAutoAck(queueName) |
| .subscribeOn(Schedulers.boundedElastic()) |
| .map(this::readCancelRequestMessage) |
| .doOnNext(worker::cancelTask) |
| .subscribe(); |
| } |
| |
| private TaskId readCancelRequestMessage(Delivery delivery) { |
| String message = new String(delivery.getBody(), StandardCharsets.UTF_8); |
| return TaskId.fromString(message); |
| } |
| |
| private OutboundMessage makeCancelRequestMessage(TaskId taskId) { |
| byte[] payload = taskId.asString().getBytes(StandardCharsets.UTF_8); |
| AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().build(); |
| return new OutboundMessage(CANCEL_REQUESTS_EXCHANGE_NAME, CANCEL_REQUESTS_ROUTING_KEY, basicProperties, payload); |
| } |
| |
| @Override |
| public void submit(TaskWithId taskWithId) { |
| try { |
| byte[] payload = taskSerializer.serialize(taskWithId.getTask()).getBytes(StandardCharsets.UTF_8); |
| AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder() |
| .headers(ImmutableMap.of(TASK_ID, taskWithId.getId().asString())) |
| .build(); |
| OutboundMessage outboundMessage = new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, basicProperties, payload); |
| sender.send(Mono.just(outboundMessage)).block(); |
| } catch (JsonProcessingException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void cancel(TaskId taskId) { |
| sendCancelRequestsQueue.onNext(taskId); |
| } |
| |
| @Override |
| public void close() { |
| Optional.ofNullable(receiverHandle).ifPresent(Disposable::dispose); |
| Optional.ofNullable(receiver).ifPresent(RabbitMQExclusiveConsumer::close); |
| Optional.ofNullable(sender).ifPresent(Sender::close); |
| Optional.ofNullable(sendCancelRequestsQueueHandle).ifPresent(Disposable::dispose); |
| Optional.ofNullable(cancelRequestListenerHandle).ifPresent(Disposable::dispose); |
| Optional.ofNullable(sender).ifPresent(Sender::close); |
| Optional.ofNullable(cancelRequestSender).ifPresent(Sender::close); |
| Optional.ofNullable(cancelRequestListener).ifPresent(Receiver::close); |
| channelPool.close(); |
| } |
| } |