blob: 58a68517dab4d2b3020122a61d663bae3fbced42 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.task.eventsourcing.distributed;
import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.util.ReactorUtils.publishIfPresent;
import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import org.apache.james.backends.rabbitmq.QueueArguments;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.JsonEventSerializer;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.task.eventsourcing.TerminationSubscriber;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Delivery;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.Sender;
public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Startable, Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTerminationSubscriber.class);
static final String EXCHANGE_NAME = "terminationSubscriberExchange";
static final String QUEUE_NAME_PREFIX = "terminationSubscriber";
static final String ROUTING_KEY = "terminationSubscriberRoutingKey";
private final TerminationQueueName queueName;
private final JsonEventSerializer serializer;
private final Sender sender;
private final ReceiverProvider receiverProvider;
private final RabbitMQConfiguration rabbitMQConfiguration;
private Sinks.Many<OutboundMessage> sendQueue;
private Sinks.Many<Event> listener;
private Disposable sendQueueHandle;
private Disposable listenQueueHandle;
@Inject
RabbitMQTerminationSubscriber(TerminationQueueName queueName, Sender sender, ReceiverProvider receiverProvider, JsonEventSerializer serializer, RabbitMQConfiguration rabbitMQConfiguration) {
this.queueName = queueName;
this.sender = sender;
this.receiverProvider = receiverProvider;
this.serializer = serializer;
this.rabbitMQConfiguration = rabbitMQConfiguration;
}
public void start() {
sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
QueueArguments.Builder builder = QueueArguments.builder();
rabbitMQConfiguration.getQueueTTL().ifPresent(builder::queueTTL);
sender.declare(QueueSpecification.queue(queueName.asString()).durable(!DURABLE).autoDelete(!AUTO_DELETE).arguments(builder.build())).block();
sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, queueName.asString())).block();
sendQueue = Sinks.many().unicast().onBackpressureBuffer();
sendQueueHandle = sender
.send(sendQueue.asFlux())
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
listener = Sinks.many().multicast().directBestEffort();
listenQueueHandle = consumeTerminationQueue();
}
public void restart() {
Disposable previousHandler = listenQueueHandle;
listenQueueHandle = consumeTerminationQueue();
previousHandler.dispose();
}
private Disposable consumeTerminationQueue() {
return Flux.using(
receiverProvider::createReceiver,
receiver -> receiver.consumeAutoAck(queueName.asString()),
Receiver::close)
.subscribeOn(Schedulers.boundedElastic())
.map(this::toEvent)
.handle(publishIfPresent())
.subscribe(e -> listener.emitNext(e, FAIL_FAST));
}
@Override
public void addEvent(Event event) {
try {
byte[] payload = serializer.serialize(event).getBytes(StandardCharsets.UTF_8);
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().build();
OutboundMessage message = new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, basicProperties, payload);
sendQueue.emitNext(message, FAIL_FAST);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@Override
public Publisher<Event> listenEvents() {
return listener.asFlux();
}
private Optional<Event> toEvent(Delivery delivery) {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
try {
Event event = serializer.deserialize(message);
return Optional.of(event);
} catch (Exception e) {
LOGGER.error("Unable to deserialize '{}'", message, e);
return Optional.empty();
}
}
@Override
@PreDestroy
public void close() {
Optional.ofNullable(sendQueueHandle).ifPresent(Disposable::dispose);
Optional.ofNullable(listenQueueHandle).ifPresent(Disposable::dispose);
}
}