blob: f115fd5eca0fdcfe3e03561ba0b44b670dc681f4 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.events;
import java.util.Collection;
import java.util.Set;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.metrics.api.MetricFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.Sender;
public class RabbitMQEventBus implements EventBus, Startable {
private static final Set<RegistrationKey> NO_KEY = ImmutableSet.of();
private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running";
static final String EVENT_BUS_ID = "eventBusId";
private final NamingStrategy namingStrategy;
private final EventSerializer eventSerializer;
private final RoutingKeyConverter routingKeyConverter;
private final RetryBackoffConfiguration retryBackoff;
private final EventBusId eventBusId;
private final EventDeadLetters eventDeadLetters;
private final ListenerExecutor listenerExecutor;
private final Sender sender;
private final ReceiverProvider receiverProvider;
private final ReactorRabbitMQChannelPool channelPool;
private final RabbitMQConfiguration configuration;
private final MetricFactory metricFactory;
private volatile boolean isRunning;
private volatile boolean isStopping;
private GroupRegistrationHandler groupRegistrationHandler;
private KeyRegistrationHandler keyRegistrationHandler;
private EventDispatcher eventDispatcher;
@Inject
public RabbitMQEventBus(NamingStrategy namingStrategy, Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer,
RetryBackoffConfiguration retryBackoff,
RoutingKeyConverter routingKeyConverter,
EventDeadLetters eventDeadLetters, MetricFactory metricFactory, ReactorRabbitMQChannelPool channelPool,
EventBusId eventBusId, RabbitMQConfiguration configuration) {
this.namingStrategy = namingStrategy;
this.sender = sender;
this.receiverProvider = receiverProvider;
this.listenerExecutor = new ListenerExecutor(metricFactory);
this.channelPool = channelPool;
this.eventBusId = eventBusId;
this.eventSerializer = eventSerializer;
this.routingKeyConverter = routingKeyConverter;
this.retryBackoff = retryBackoff;
this.eventDeadLetters = eventDeadLetters;
this.configuration = configuration;
this.metricFactory = metricFactory;
this.isRunning = false;
this.isStopping = false;
}
public void start() {
if (!isRunning && !isStopping) {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff, configuration, metricFactory);
groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId, configuration);
eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters, configuration);
eventDispatcher.start();
keyRegistrationHandler.start();
isRunning = true;
}
}
public void restart() {
keyRegistrationHandler.restart();
groupRegistrationHandler.restart();
}
@VisibleForTesting
void startWithoutStartingKeyRegistrationHandler() {
if (!isRunning && !isStopping) {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff, configuration, metricFactory);
groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId, configuration);
eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters, configuration);
keyRegistrationHandler.declareQueue();
eventDispatcher.start();
isRunning = true;
}
}
@VisibleForTesting
void startKeyRegistrationHandler() {
keyRegistrationHandler.start();
}
@PreDestroy
public void stop() {
if (isRunning && !isStopping) {
isStopping = true;
isRunning = false;
groupRegistrationHandler.stop();
keyRegistrationHandler.stop();
}
}
@Override
public Mono<Registration> register(EventListener.ReactiveEventListener listener, RegistrationKey key) {
Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE);
return Mono.from(metricFactory.decoratePublisherWithTimerMetric("rabbit-register", keyRegistrationHandler.register(listener, key)));
}
@Override
public Registration register(EventListener.ReactiveEventListener listener, Group group) {
Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE);
return groupRegistrationHandler.register(listener, group);
}
@Override
public Mono<Void> dispatch(Event event, Set<RegistrationKey> key) {
Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE);
if (!event.isNoop()) {
return Mono.from(metricFactory.decoratePublisherWithTimerMetric("rabbit-dispatch", eventDispatcher.dispatch(event, key)));
}
return Mono.empty();
}
@Override
public Mono<Void> reDeliver(Group group, Event event) {
Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE);
if (!event.isNoop()) {
/*
if the eventBus.dispatch() gets error while dispatching an event (rabbitMQ network outage maybe),
which means all the group consumers will not be receiving that event.
We store the that event in the dead letter and expecting in the future, it will be dispatched
again not only for a specific consumer but all.
That's why it is special, and we need to check event type before processing further.
*/
if (group instanceof DispatchingFailureGroup) {
return eventDispatcher.dispatch(event, NO_KEY);
}
return groupRegistrationHandler.retrieveGroupRegistration(group).reDeliver(event);
}
return Mono.empty();
}
@Override
public EventBusName eventBusName() {
return namingStrategy.getEventBusName();
}
@Override
public Collection<Group> listRegisteredGroups() {
return groupRegistrationHandler.registeredGroups();
}
}