blob: 661d5b7ca6c1657eea44e8f6a7ecd16dfe56f6cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import java.io.IOException;
import java.util.List;
public class EsRestClient {
private static EsRestClient ES_REST_CLIENT;
private static RestClient REST_CLIENT;
private EsRestClient() {
}
@SuppressWarnings("checkstyle:MagicNumber")
private static RestClientBuilder getRestClientBuilder(List<String> hosts, String username, String password) {
HttpHost[] httpHosts = new HttpHost[hosts.size()];
for (int i = 0; i < hosts.size(); i++) {
String[] hostInfo = hosts.get(i).replace("http://", "").split(":");
httpHosts[i] = new HttpHost(hostInfo[0], Integer.parseInt(hostInfo[1]));
}
RestClientBuilder builder = RestClient.builder(httpHosts)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectionRequestTimeout(10 * 1000)
.setSocketTimeout(5 * 60 * 1000));
if (StringUtils.isNotEmpty(username)) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
return builder;
}
public static EsRestClient getInstance(List<String> hosts, String username, String password) {
if (REST_CLIENT == null) {
RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password);
REST_CLIENT = restClientBuilder.build();
ES_REST_CLIENT = new EsRestClient();
}
return ES_REST_CLIENT;
}
public BulkResponse bulk(String requestBody) {
Request request = new Request("POST", "_bulk");
request.setJsonEntity(requestBody);
try {
Response response = REST_CLIENT.performRequest(request);
if (response == null) {
throw new BulkElasticsearchException("bulk es Response is null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
ObjectMapper objectMapper = new ObjectMapper();
String entity = EntityUtils.toString(response.getEntity());
JsonNode json = objectMapper.readTree(entity);
int took = json.get("took").asInt();
boolean errors = json.get("errors").asBoolean();
return new BulkResponse(errors, took, entity);
} else {
throw new BulkElasticsearchException(String.format("bulk es response status code=%d,request boy=%s", response.getStatusLine().getStatusCode(), requestBody));
}
} catch (IOException e) {
throw new BulkElasticsearchException(String.format("bulk es error,request boy=%s", requestBody), e);
}
}
/**
* @return version.number, example:2.0.0
*/
public static String getClusterVersion() {
Request request = new Request("GET", "/");
try {
Response response = REST_CLIENT.performRequest(request);
String result = EntityUtils.toString(response.getEntity());
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(result);
JsonNode versionNode = jsonNode.get("version");
return versionNode.get("number").asText();
} catch (IOException e) {
throw new GetElasticsearchVersionException("fail to get elasticsearch version.", e);
}
}
public void close() throws IOException {
REST_CLIENT.close();
}
}