blob: 0eae378722fc0536c22457c0b3bf88ffc7faedf0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.http.client;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Slf4j
public class HttpClientProvider implements AutoCloseable {
private static final String ENCODING = "UTF-8";
private static final String APPLICATION_JSON = "application/json";
private static final int CONNECT_TIMEOUT = 6000 * 2;
private static final int SOCKET_TIMEOUT = 6000 * 10;
private static final int INITIAL_CAPACITY = 16;
private static final RequestConfig REQUEST_CONFIG = RequestConfig.custom()
.setConnectTimeout(CONNECT_TIMEOUT)
.setSocketTimeout(SOCKET_TIMEOUT)
.build();
private final CloseableHttpClient httpClient;
private final Retryer<CloseableHttpResponse> retryer;
public HttpClientProvider(HttpParameter httpParameter) {
this.httpClient = HttpClients.createDefault();
this.retryer = buildRetryer(httpParameter);
}
private Retryer<CloseableHttpResponse> buildRetryer(HttpParameter httpParameter) {
if (httpParameter.getRetry() < 1) {
return RetryerBuilder.<CloseableHttpResponse>newBuilder().build();
}
return RetryerBuilder.<CloseableHttpResponse>newBuilder()
.retryIfException(ex -> ExceptionUtils.indexOfType(ex, IOException.class) != -1)
.withStopStrategy(StopStrategies.stopAfterAttempt(httpParameter.getRetry()))
.withWaitStrategy(WaitStrategies.fibonacciWait(httpParameter.getRetryBackoffMultiplierMillis(),
httpParameter.getRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
if (attempt.hasException()) {
log.warn(String.format("[%d] request http failed",
attempt.getAttemptNumber()), attempt.getExceptionCause());
}
}
})
.build();
}
public HttpResponse execute(String url, String method, Map<String, String> headers, Map<String, String> params) throws Exception {
// convert method option to uppercase
method = method.toUpperCase(Locale.ROOT);
if (HttpPost.METHOD_NAME.equals(method)) {
return doPost(url, headers, params);
}
if (HttpGet.METHOD_NAME.equals(method)) {
return doGet(url, headers, params);
}
if (HttpPut.METHOD_NAME.equals(method)) {
return doPut(url, params);
}
if (HttpDelete.METHOD_NAME.equals(method)) {
return doDelete(url, params);
}
// if http method that user assigned is not support by http provider, default do get
return doGet(url, headers, params);
}
/**
* Send a get request without request headers and request parameters
*
* @param url request address
* @return http response result
* @throws Exception information
*/
public HttpResponse doGet(String url) throws Exception {
return doGet(url, Collections.emptyMap(), Collections.emptyMap());
}
/**
* Send a get request with request parameters
*
* @param url request address
* @param params request parameter map
* @return http response result
* @throws Exception information
*/
public HttpResponse doGet(String url, Map<String, String> params) throws Exception {
return doGet(url, Collections.emptyMap(), params);
}
/**
* Send a get request with request headers and request parameters
*
* @param url request address
* @param headers request header map
* @param params request parameter map
* @return http response result
* @throws Exception information
*/
public HttpResponse doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
// Create access address
URIBuilder uriBuilder = new URIBuilder(url);
// add parameter to uri
addParameters(uriBuilder, params);
// create a new http get
HttpGet httpGet = new HttpGet(uriBuilder.build());
// set default request config
httpGet.setConfig(REQUEST_CONFIG);
// set request header
addHeaders(httpGet, headers);
// return http response
return getResponse(httpGet);
}
/**
* Send a post request without request headers and request parameters
*
* @param url request address
* @return http response result
* @throws Exception information
*/
public HttpResponse doPost(String url) throws Exception {
return doPost(url, Collections.emptyMap(), Collections.emptyMap());
}
/**
* Send post request with request parameters
*
* @param url request address
* @param params request parameter map
* @return http response result
* @throws Exception information
*/
public HttpResponse doPost(String url, Map<String, String> params) throws Exception {
return doPost(url, Collections.emptyMap(), params);
}
/**
* Send a post request with request headers and request parameters
*
* @param url request address
* @param headers request header map
* @param params request parameter map
* @return http response result
* @throws Exception information
*/
public HttpResponse doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
// create a new http get
HttpPost httpPost = new HttpPost(url);
// set default request config
httpPost.setConfig(REQUEST_CONFIG);
// set request header
addHeaders(httpPost, headers);
// set request params
addParameters(httpPost, params);
// return http response
return getResponse(httpPost);
}
/**
* Send a post request with request body and without headers
* @param url request address
* @param body request body conetent
* @return http response result
* @throws Exception information
*/
public HttpResponse doPost(String url, String body) throws Exception {
return doPost(url, Collections.emptyMap(), body);
}
/**
* Send a post request with request headers and request body
* @param url request address
* @param headers request header map
* @param body request body content
* @return http response result
* @throws Exception information
*/
public HttpResponse doPost(String url, Map<String, String> headers, String body) throws Exception {
// create a new http post
HttpPost httpPost = new HttpPost(url);
// set default request config
httpPost.setConfig(REQUEST_CONFIG);
// set request header
addHeaders(httpPost, headers);
// add body in request
addBody(httpPost, body);
// return http response
return getResponse(httpPost);
}
/**
* Send a put request without request parameters
*
* @param url request address
* @return http response result
* @throws Exception information
*/
public HttpResponse doPut(String url) throws Exception {
return doPut(url, Collections.emptyMap());
}
/**
* Send a put request with request parameters
*
* @param url request address
* @param params request parameter map
* @return http response result
* @throws Exception information
*/
public HttpResponse doPut(String url, Map<String, String> params) throws Exception {
// create a new http put
HttpPut httpPut = new HttpPut(url);
// set default request config
httpPut.setConfig(REQUEST_CONFIG);
// set request params
addParameters(httpPut, params);
// return http response
return getResponse(httpPut);
}
/**
* Send delete request without request parameters
*
* @param url request address
* @return http response result
* @throws Exception information
*/
public HttpResponse doDelete(String url) throws Exception {
// create a new http delete
HttpDelete httpDelete = new HttpDelete(url);
// set default request config
httpDelete.setConfig(REQUEST_CONFIG);
// return http response
return getResponse(httpDelete);
}
/**
* Send delete request with request parameters
*
* @param url request address
* @param params request parameter map
* @return http response result
* @throws Exception information
*/
public HttpResponse doDelete(String url, Map<String, String> params) throws Exception {
if (params == null) {
params = new HashMap<>(INITIAL_CAPACITY);
}
params.put("_method", "delete");
return doPost(url, params);
}
private HttpResponse getResponse(HttpRequestBase request) throws Exception {
// execute request
try (CloseableHttpResponse httpResponse = retryWithException(request)) {
// get return result
if (httpResponse != null && httpResponse.getStatusLine() != null) {
String content = "";
if (httpResponse.getEntity() != null) {
content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
}
return new HttpResponse(httpResponse.getStatusLine().getStatusCode(), content);
}
}
return new HttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
}
private CloseableHttpResponse retryWithException(HttpRequestBase request) throws Exception {
return retryer.call(() -> httpClient.execute(request));
}
private void addParameters(URIBuilder builder, Map<String, String> params) {
if (Objects.isNull(params) || params.isEmpty()) {
return;
}
params.forEach(builder::setParameter);
}
private void addParameters(HttpEntityEnclosingRequestBase request, Map<String, String> params) throws UnsupportedEncodingException {
if (Objects.isNull(params) || params.isEmpty()) {
return;
}
List<NameValuePair> parameters = new ArrayList<>();
Set<Map.Entry<String, String>> entrySet = params.entrySet();
for (Map.Entry<String, String> e : entrySet) {
String name = e.getKey();
String value = e.getValue();
NameValuePair pair = new BasicNameValuePair(name, value);
parameters.add(pair);
}
// Set to the request's http object
request.setEntity(new UrlEncodedFormEntity(parameters, ENCODING));
}
private void addHeaders(HttpRequestBase request, Map<String, String> headers) {
if (Objects.isNull(headers) || headers.isEmpty()) {
return;
}
headers.forEach(request::addHeader);
}
private void addBody(HttpEntityEnclosingRequestBase request, String body) {
request.addHeader(HTTP.CONTENT_TYPE, APPLICATION_JSON);
StringEntity entity = new StringEntity(body, ContentType.APPLICATION_JSON);
entity.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE, APPLICATION_JSON));
request.setEntity(entity);
}
@Override
public void close() throws IOException {
if (Objects.nonNull(httpClient)) {
httpClient.close();
}
}
}