blob: 8e4762d608d1814cc7c985caba18c3ea7b0614f5 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.backends.rabbitmq;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.MoreObjects;
import feign.Client;
import feign.Feign;
import feign.Logger;
import feign.Param;
import feign.RequestLine;
import feign.RetryableException;
import feign.Retryer;
import feign.auth.BasicAuthRequestInterceptor;
import feign.codec.ErrorDecoder;
import feign.jackson.JacksonDecoder;
import feign.jackson.JacksonEncoder;
import feign.slf4j.Slf4jLogger;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
public interface RabbitMQManagementAPI {
class MessageQueue {
@JsonProperty("name")
String name;
@JsonProperty("vhost")
String vhost;
@JsonProperty("auto_delete")
boolean autoDelete;
@JsonProperty("durable")
boolean durable;
@JsonProperty("exclusive")
boolean exclusive;
@JsonProperty("arguments")
Map<String, String> arguments;
public String getName() {
return name;
}
public String getVhost() {
return vhost;
}
public boolean isAutoDelete() {
return autoDelete;
}
public boolean isDurable() {
return durable;
}
public boolean isExclusive() {
return exclusive;
}
public Map<String, String> getArguments() {
return arguments;
}
}
class MessageQueueDetails {
@JsonProperty("name")
String name;
@JsonProperty("vhost")
String vhost;
@JsonProperty("auto_delete")
boolean autoDelete;
@JsonProperty("durable")
boolean durable;
@JsonProperty("exclusive")
boolean exclusive;
@JsonProperty("arguments")
Map<String, String> arguments;
@JsonProperty("consumer_details")
List<ConsumerDetails> consumerDetails;
public String getName() {
return name;
}
public String getVhost() {
return vhost;
}
public boolean isAutoDelete() {
return autoDelete;
}
public boolean isDurable() {
return durable;
}
public boolean isExclusive() {
return exclusive;
}
public Map<String, String> getArguments() {
return arguments;
}
public List<ConsumerDetails> getConsumerDetails() {
return consumerDetails;
}
}
class ConsumerDetails {
@JsonProperty("consumer_tag")
String tag;
@JsonProperty("activity_status")
ActivityStatus status;
public ActivityStatus getStatus() {
return this.status;
}
public String getTag() {
return this.tag;
}
}
@JsonFormat(shape = JsonFormat.Shape.STRING)
enum ActivityStatus {
Waiting("waiting"),
SingleActive("single_active");
private final String value;
ActivityStatus(String value) {
this.value = value;
}
@JsonValue
String getValue() {
return value;
}
}
class Exchange {
@JsonProperty("name")
String name;
@JsonProperty("type")
String type;
@JsonProperty("auto_delete")
boolean autoDelete;
@JsonProperty("durable")
boolean durable;
@JsonProperty("internal")
boolean internal;
@JsonProperty("arguments")
Map<String, String> arguments;
public String getName() {
return name;
}
public String getType() {
return type;
}
public boolean isAutoDelete() {
return autoDelete;
}
public boolean isDurable() {
return durable;
}
public boolean isInternal() {
return internal;
}
public Map<String, String> getArguments() {
return arguments;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("name", name)
.add("type", type)
.add("autoDelete", autoDelete)
.add("durable", durable)
.add("internal", internal)
.add("arguments", arguments)
.toString();
}
}
class BindingSource {
private final String source;
private final String vhost;
private final String destination;
private final String destinationType;
private final String routingKey;
private final Map<String, String> arguments;
private final String propertiesKey;
public BindingSource(@JsonProperty("source") String source,
@JsonProperty("vhost") String vhost,
@JsonProperty("destination") String destination,
@JsonProperty("destination_type") String destinationType,
@JsonProperty("routing_key") String routingKey,
@JsonProperty("arguments") Map<String, String> arguments,
@JsonProperty("properties_key") String propertiesKey) {
this.source = source;
this.vhost = vhost;
this.destination = destination;
this.destinationType = destinationType;
this.routingKey = routingKey;
this.arguments = arguments;
this.propertiesKey = propertiesKey;
}
public String getSource() {
return source;
}
public String getVhost() {
return vhost;
}
public String getDestination() {
return destination;
}
public String getDestinationType() {
return destinationType;
}
public String getRoutingKey() {
return routingKey;
}
public Map<String, String> getArguments() {
return arguments;
}
public String getPropertiesKey() {
return propertiesKey;
}
@Override
public final boolean equals(Object o) {
if (o instanceof BindingSource) {
BindingSource that = (BindingSource) o;
return Objects.equals(this.source, that.source)
&& Objects.equals(this.vhost, that.vhost)
&& Objects.equals(this.destination, that.destination)
&& Objects.equals(this.destinationType, that.destinationType)
&& Objects.equals(this.routingKey, that.routingKey)
&& Objects.equals(this.arguments, that.arguments)
&& Objects.equals(this.propertiesKey, that.propertiesKey);
}
return false;
}
@Override
public final int hashCode() {
return Objects.hash(source, vhost, destination, destinationType, routingKey, arguments, propertiesKey);
}
}
static RabbitMQManagementAPI from(RabbitMQConfiguration configuration) {
RabbitMQConfiguration.ManagementCredentials credentials = configuration.getManagementCredentials();
RabbitMQManagementAPI rabbitMQManagementAPI = null;
try {
rabbitMQManagementAPI = Feign.builder()
.client(getClient(configuration))
.requestInterceptor(new BasicAuthRequestInterceptor(credentials.getUser(), new String(credentials.getPassword())))
.logger(new Slf4jLogger(RabbitMQManagementAPI.class))
.logLevel(Logger.Level.FULL)
.encoder(new JacksonEncoder())
.decoder(new JacksonDecoder())
.retryer(new Retryer.Default())
.errorDecoder(RETRY_500)
.target(RabbitMQManagementAPI.class, configuration.getManagementUri().toString());
} catch (KeyManagementException | NoSuchAlgorithmException | CertificateException | KeyStoreException | IOException | UnrecoverableKeyException e) {
throw new RuntimeException(e);
}
return rabbitMQManagementAPI;
}
private static Client getClient(RabbitMQConfiguration configuration) throws KeyManagementException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, UnrecoverableKeyException {
if (configuration.useSslManagement()) {
SSLContextBuilder sslContextBuilder = new SSLContextBuilder();
setupSslValidationStrategy(sslContextBuilder, configuration);
setupClientCertificateAuthentication(sslContextBuilder, configuration);
SSLContext sslContext = sslContextBuilder.build();
return new Client.Default(sslContext.getSocketFactory(), getHostNameVerifier(configuration));
}
else {
return new Client.Default(null, null);
}
}
private static HostnameVerifier getHostNameVerifier(RabbitMQConfiguration configuration) {
switch (configuration.getSslConfiguration().getHostNameVerifier()) {
case ACCEPT_ANY_HOSTNAME:
return ((hostname, session) -> true);
default:
return new DefaultHostnameVerifier();
}
}
private static void setupClientCertificateAuthentication(SSLContextBuilder sslContextBuilder, RabbitMQConfiguration configuration) throws NoSuchAlgorithmException, KeyStoreException, UnrecoverableKeyException, CertificateException, IOException {
Optional<RabbitMQConfiguration.SSLConfiguration.SSLKeyStore> keyStore = configuration.getSslConfiguration().getKeyStore();
if (keyStore.isPresent()) {
RabbitMQConfiguration.SSLConfiguration.SSLKeyStore sslKeyStore = keyStore.get();
sslContextBuilder.loadKeyMaterial(sslKeyStore.getFile(), sslKeyStore.getPassword(), sslKeyStore.getPassword());
}
}
private static void setupSslValidationStrategy(SSLContextBuilder sslContextBuilder, RabbitMQConfiguration configuration) throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException {
RabbitMQConfiguration.SSLConfiguration.SSLValidationStrategy strategy = configuration
.getSslConfiguration()
.getStrategy();
final TrustStrategy TRUST_ALL = (x509Certificates, authType) -> true;
switch (strategy) {
case DEFAULT:
break;
case IGNORE:
sslContextBuilder.loadTrustMaterial(TRUST_ALL);
break;
case OVERRIDE:
applyTrustStore(sslContextBuilder, configuration);
break;
default:
throw new NotImplementedException(
String.format("unrecognized strategy '%s'", strategy.name()));
}
}
private static SSLContextBuilder applyTrustStore(SSLContextBuilder sslContextBuilder, RabbitMQConfiguration configuration) throws CertificateException, NoSuchAlgorithmException,
KeyStoreException, IOException {
RabbitMQConfiguration.SSLConfiguration.SSLTrustStore trustStore = configuration.getSslConfiguration()
.getTrustStore()
.orElseThrow(() -> new IllegalStateException("SSLTrustStore cannot to be empty"));
return sslContextBuilder
.loadTrustMaterial(trustStore.getFile(), trustStore.getPassword());
}
ErrorDecoder RETRY_500 = (methodKey, response) -> {
if (response.status() == 500) {
throw new RetryableException(response.status(), "Error encountered, scheduling retry", response.request().httpMethod(), new Date());
}
throw new RuntimeException("Non recoverable exception status: " + response.status());
};
@RequestLine("GET /api/queues")
List<MessageQueue> listQueues();
@RequestLine(value = "GET /api/queues/{vhost}/{name}", decodeSlash = false)
MessageQueueDetails queueDetails(@Param("vhost") String vhost, @Param("name") String name);
@RequestLine(value = "DELETE /api/queues/{vhost}/{name}", decodeSlash = false)
void deleteQueue(@Param("vhost") String vhost, @Param("name") String name);
@RequestLine(value = "GET /api/exchanges/{vhost}/{name}/bindings/source", decodeSlash = false)
List<BindingSource> listBindings(@Param("vhost") String vhost, @Param("name") String name);
@RequestLine("GET /api/exchanges")
List<Exchange> listExchanges();
}