blob: acb7fdaea83f7a3729f0f7c68ae17f538b8132e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.library.elasticsearch;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linecorp.armeria.client.ClientFactory;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.client.WebClientBuilder;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.client.logging.LoggingClient;
import com.linecorp.armeria.client.retry.RetryRule;
import com.linecorp.armeria.client.retry.RetryingClient;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.auth.AuthToken;
import com.linecorp.armeria.common.util.Exceptions;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import lombok.Getter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.client.AliasClient;
import org.apache.skywalking.library.elasticsearch.client.DocumentClient;
import org.apache.skywalking.library.elasticsearch.client.IndexClient;
import org.apache.skywalking.library.elasticsearch.client.SearchClient;
import org.apache.skywalking.library.elasticsearch.client.TemplateClient;
import org.apache.skywalking.library.elasticsearch.requests.search.Scroll;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
import org.apache.skywalking.library.elasticsearch.response.NodeInfo;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.library.util.StringUtil;
@Slf4j
@Accessors(fluent = true)
public final class ElasticSearch implements Closeable {
private final ObjectMapper mapper = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@Getter
private final WebClient client;
@Getter
private final CompletableFuture<ElasticSearchVersion> version;
private final EndpointGroup endpointGroup;
private final ClientFactory clientFactory;
private final Consumer<List<Endpoint>> healthyEndpointListener;
private final TemplateClient templateClient;
private final IndexClient indexClient;
private final DocumentClient documentClient;
private final AliasClient aliasClient;
private final SearchClient searchClient;
ElasticSearch(SessionProtocol protocol,
String username, String password,
EndpointGroup endpointGroup,
ClientFactory clientFactory,
Consumer<Boolean> healthyListener,
Duration responseTimeout) {
this.endpointGroup = endpointGroup;
this.clientFactory = clientFactory;
if (healthyListener != null) {
healthyEndpointListener = it -> healthyListener.accept(!it.isEmpty());
} else {
healthyEndpointListener = it -> {
};
}
final WebClientBuilder builder =
WebClient.builder(protocol, endpointGroup)
.factory(clientFactory)
.responseTimeout(responseTimeout)
.decorator(LoggingClient.builder()
.logger(log)
.newDecorator())
.decorator(RetryingClient.builder(RetryRule.failsafe())
.maxTotalAttempts(3)
.newDecorator());
if (StringUtil.isNotBlank(username) && StringUtil.isNotBlank(password)) {
builder.auth(AuthToken.ofBasic(username, password));
}
client = builder.build();
version = new CompletableFuture<>();
templateClient = new TemplateClient(version, client);
documentClient = new DocumentClient(version, client);
indexClient = new IndexClient(version, client);
aliasClient = new AliasClient(version, client);
searchClient = new SearchClient(version, client);
}
public static ElasticSearchBuilder builder() {
return new ElasticSearchBuilder();
}
public CompletableFuture<ElasticSearchVersion> connect() {
final CompletableFuture<ElasticSearchVersion> future =
client.get("/").aggregate().thenApply(response -> {
final HttpStatus status = response.status();
if (status != HttpStatus.OK) {
throw new RuntimeException(
"Failed to connect to ElasticSearch server: " + response.contentUtf8());
}
try (final HttpData content = response.content();
final InputStream is = content.toInputStream()) {
final NodeInfo node = mapper.readValue(is, NodeInfo.class);
final String vn = node.getVersion().getNumber();
final String distribution = node.getVersion().getDistribution();
return ElasticSearchVersion.of(distribution, vn);
} catch (IOException e) {
return Exceptions.throwUnsafely(e);
}
});
future.whenComplete((v, throwable) -> {
if (throwable != null) {
final RuntimeException cause =
new RuntimeException("Failed to determine ElasticSearch version", throwable);
version.completeExceptionally(cause);
healthyEndpointListener.accept(Collections.emptyList());
return;
}
log.info("ElasticSearch version is: {}", v);
version.complete(v);
});
endpointGroup.whenReady().thenAccept(healthyEndpointListener);
endpointGroup.addListener(healthyEndpointListener);
return future;
}
public TemplateClient templates() {
return templateClient;
}
public DocumentClient documents() {
return documentClient;
}
public IndexClient index() {
return indexClient;
}
public AliasClient alias() {
return aliasClient;
}
public SearchResponse search(Search search, SearchParams params, String... index) {
return searchClient.search(search, params, index);
}
public SearchResponse search(Search search, String... index) {
return search(search, null, index);
}
public SearchResponse scroll(Duration contextRetention, String scrollId) {
return searchClient.scroll(
Scroll.builder()
.contextRetention(contextRetention)
.scrollId(scrollId)
.build());
}
@Override
public void close() {
endpointGroup.removeListener(healthyEndpointListener);
clientFactory.close();
endpointGroup.close();
}
}