blob: bc87d19f36d071232580c1f996469bcd8dd2eade [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.backends.opensearch;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.time.LocalDateTime;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
import org.apache.james.util.concurrent.NamedThreadFactory;
import org.opensearch.client.RestClient;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
public class ClientProvider implements Provider<ReactorOpenSearchClient> {
private static class HttpAsyncClientConfigurer {
private static final TrustStrategy TRUST_ALL = (x509Certificates, authType) -> true;
private static final HostnameVerifier ACCEPT_ANY_HOSTNAME = (hostname, sslSession) -> true;
private final OpenSearchConfiguration configuration;
private HttpAsyncClientConfigurer(OpenSearchConfiguration configuration) {
this.configuration = configuration;
}
private HttpAsyncClientBuilder configure(HttpAsyncClientBuilder builder) {
configureAuthentication(builder);
configureHostScheme(builder);
configureTimeout(builder);
configuration.getMaxConnections().ifPresent(builder::setMaxConnTotal);
configuration.getMaxConnectionsPerHost().ifPresent(builder::setMaxConnPerRoute);
builder.setThreadFactory(NamedThreadFactory.withName("OpenSearch-driver"));
return builder;
}
private void configureHostScheme(HttpAsyncClientBuilder builder) {
OpenSearchConfiguration.HostScheme scheme = configuration.getHostScheme();
switch (scheme) {
case HTTP:
return;
case HTTPS:
configureSSLOptions(builder);
return;
default:
throw new NotImplementedException(
String.format("unrecognized hostScheme '%s'", scheme.name()));
}
}
private void configureSSLOptions(HttpAsyncClientBuilder builder) {
try {
builder
.setSSLContext(sslContext())
.setSSLHostnameVerifier(hostnameVerifier());
} catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException | CertificateException | IOException e) {
throw new RuntimeException("Cannot set SSL options to the builder", e);
}
}
private void configureTimeout(HttpAsyncClientBuilder builder) {
builder.setDefaultRequestConfig(requestConfig());
}
private SSLContext sslContext() throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException,
CertificateException, IOException {
SSLContextBuilder sslContextBuilder = new SSLContextBuilder();
OpenSearchConfiguration.SSLConfiguration.SSLValidationStrategy strategy = configuration.getSslConfiguration()
.getStrategy();
switch (strategy) {
case DEFAULT:
return sslContextBuilder.build();
case IGNORE:
return sslContextBuilder.loadTrustMaterial(TRUST_ALL)
.build();
case OVERRIDE:
return applyTrustStore(sslContextBuilder)
.build();
default:
throw new NotImplementedException(
String.format("unrecognized strategy '%s'", strategy.name()));
}
}
private HostnameVerifier hostnameVerifier() {
OpenSearchConfiguration.SSLConfiguration.HostNameVerifier hostnameVerifier = configuration.getSslConfiguration()
.getHostNameVerifier();
switch (hostnameVerifier) {
case DEFAULT:
return new DefaultHostnameVerifier();
case ACCEPT_ANY_HOSTNAME:
return ACCEPT_ANY_HOSTNAME;
default:
throw new NotImplementedException(
String.format("unrecognized HostNameVerifier '%s'", hostnameVerifier.name()));
}
}
private RequestConfig requestConfig() {
return RequestConfig.custom()
.setConnectTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
.setConnectionRequestTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
.setSocketTimeout(Math.toIntExact(configuration.getRequestTimeout().toMillis()))
.build();
}
private SSLContextBuilder applyTrustStore(SSLContextBuilder sslContextBuilder) throws CertificateException, NoSuchAlgorithmException,
KeyStoreException, IOException {
OpenSearchConfiguration.SSLConfiguration.SSLTrustStore trustStore = configuration.getSslConfiguration()
.getTrustStore()
.orElseThrow(() -> new IllegalStateException("SSLTrustStore cannot to be empty"));
return sslContextBuilder
.loadTrustMaterial(trustStore.getFile(), trustStore.getPassword());
}
private void configureAuthentication(HttpAsyncClientBuilder builder) {
configuration.getCredential()
.ifPresent(credential -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(credential.getUsername(), String.valueOf(credential.getPassword())));
builder.setDefaultCredentialsProvider(credentialsProvider);
});
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class);
private final OpenSearchConfiguration configuration;
private final RestClient lowLevelRestClient;
private final OpenSearchAsyncClient openSearchClient;
private final HttpAsyncClientConfigurer httpAsyncClientConfigurer;
private final ReactorOpenSearchClient client;
@Inject
public ClientProvider(OpenSearchConfiguration configuration) {
this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration);
this.configuration = configuration;
this.lowLevelRestClient = buildRestClient();
this.openSearchClient = connect();
this.client = new ReactorOpenSearchClient(this.openSearchClient, lowLevelRestClient);
}
private RestClient buildRestClient() {
return RestClient.builder(hostsToHttpHosts())
.setHttpClientConfigCallback(httpAsyncClientConfigurer::configure)
.build();
}
private OpenSearchAsyncClient connect() {
Duration waitDelay = Duration.ofMillis(configuration.getMinDelay());
boolean suppressLeadingZeroElements = true;
boolean suppressTrailingZeroElements = true;
return Mono.fromCallable(this::connectToCluster)
.doOnError(e -> LOGGER.warn("Error establishing OpenSearch connection. Next retry scheduled in {}",
DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), suppressLeadingZeroElements, suppressTrailingZeroElements), e))
.retryWhen(Retry.backoff(configuration.getMaxRetries(), waitDelay).scheduler(Schedulers.boundedElastic()))
.publishOn(Schedulers.boundedElastic())
.block();
}
private OpenSearchAsyncClient connectToCluster() {
LOGGER.info("Trying to connect to OpenSearch service at {}", LocalDateTime.now());
RestClientTransport transport = new RestClientTransport(lowLevelRestClient, new JacksonJsonpMapper());
return new OpenSearchAsyncClient(transport);
}
private HttpHost[] hostsToHttpHosts() {
return configuration.getHosts().stream()
.map(host -> new HttpHost(host.getHostName(), host.getPort(), configuration.getHostScheme().name()))
.toArray(HttpHost[]::new);
}
@Override
public ReactorOpenSearchClient get() {
return client;
}
@PreDestroy
public void close() throws IOException {
lowLevelRestClient.close();
}
}