blob: d8a220c97f67b4ad296f390459ab753367b802a7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.http;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.errors.ConnectException;
import io.openmessaging.connector.api.errors.RetriableException;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.connect.http.auth.Auth;
import org.apache.rocketmq.connect.http.auth.BasicAuthImpl;
import org.apache.rocketmq.connect.http.auth.ApiKeyAuthImpl;
import org.apache.rocketmq.connect.http.auth.OAuthClientImpl;
import org.apache.rocketmq.connect.http.constant.AuthTypeEnum;
import org.apache.rocketmq.connect.http.constant.HttpConstant;
import org.apache.rocketmq.connect.http.auth.ApacheHttpClientImpl;
import org.apache.rocketmq.connect.http.auth.AbstractHttpClient;
import org.apache.rocketmq.connect.http.auth.HttpCallback;
import org.apache.rocketmq.connect.http.entity.HttpBasicAuthParameters;
import org.apache.rocketmq.connect.http.entity.HttpApiKeyAuthParameters;
import org.apache.rocketmq.connect.http.entity.HttpAuthParameters;
import org.apache.rocketmq.connect.http.entity.HttpOAuthParameters;
import org.apache.rocketmq.connect.http.entity.HttpRequest;
import org.apache.rocketmq.connect.http.entity.ProxyConfig;
import org.apache.rocketmq.connect.http.util.CheckUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class HttpSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(HttpSinkTask.class);
protected static final int DEFAULT_CONSUMER_TIMEOUT_SECONDS = 30;
protected static final String DEFAULT_REQUEST_TIMEOUT_MILL_SECONDS = "3000";
protected static final int DEFAULT_OAUTH_DELAY_SECONDS = 1;
protected static final int DEFAULT_CONCURRENCY = 1;
private String url;
private String method;
private String headerParameters;
private String fixedHeaderParameters;
private String queryParameters;
private String fixedQueryParameters;
private String body;
private String socks5UserName;
private String socks5Password;
private String socks5Endpoint;
private String timeout;
private String concurrency;
private String authType;
private String basicUsername;
private String basicPassword;
private String apiKeyUsername;
private String apiKeyPassword;
private String oAuthEndpoint;
private String oAuthHttpMethod;
private String oAuthClientId;
private String oAuthClientSecret;
private String oAuthHeaderParameters;
private String oAuthQueryParameters;
private String oAuthBody;
private String token;
private Long awaitTimeoutSeconds;
private Auth auth;
private HttpAuthParameters httpAuthParameters;
private ScheduledExecutorService scheduledExecutorService;
private AbstractHttpClient httpClient;
@Override
public void put(List<ConnectRecord> records) throws ConnectException {
try {
Long startTime = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(records.size());
HttpCallback httpCallback = new HttpCallback(countDownLatch);
for (ConnectRecord connectRecord : records) {
// body
// header
Map<String, String> headerMap = renderHeaderMap(headerParameters, fixedHeaderParameters, token);
if (auth != null) {
headerMap.putAll(auth.auth());
}
// render query to url
String urlWithQueryParameters = renderQueryParametersToUrl(url, queryParameters, fixedQueryParameters);
HttpRequest httpRequest = new HttpRequest();
httpRequest.setUrl(urlWithQueryParameters);
httpRequest.setMethod(method);
httpRequest.setHeaderMap(headerMap);
if (body != null) {
httpRequest.setBody(body);
} else {
httpRequest.setBody(new Gson().toJson(connectRecord.getData()));
}
httpRequest.setTimeout(StringUtils.isNotBlank(timeout) ? timeout : DEFAULT_REQUEST_TIMEOUT_MILL_SECONDS);
log.info("HttpSinkTask send request | url:{} | method: {} | headerMap: {} | body: {}",
httpRequest.getUrl(), httpRequest.getMethod(), httpRequest.getHeaderMap(), httpRequest.getBody());
httpClient.executeNotReturn(httpRequest, httpCallback);
}
boolean consumeSucceed = Boolean.FALSE;
try {
consumeSucceed = countDownLatch.await(awaitTimeoutSeconds, TimeUnit.SECONDS);
} catch (Throwable e) {
log.error("count down latch failed.", e);
}
if (!consumeSucceed) {
throw new RetriableException("Request Timeout");
}
if (httpCallback.isFailed()) {
throw new RetriableException(httpCallback.getMsg());
}
log.info("HttpSinkTask put size:{},cost:{}", records.size(), System.currentTimeMillis() - startTime);
} catch (Exception e) {
log.error("HttpSinkTask | put | error => ", e);
throw new RuntimeException(e);
}
}
private Map<String, String> renderHeaderMap(String headerParameters, String fixedHeaderParameters, String token) {
Map<String, String> headerMap = new HashMap<>();
if (headerParameters != null) {
Map<String, String> userDefinedHeaders = new Gson().fromJson(headerParameters, new TypeToken<Map<String, String>>() {
}.getType());
headerMap.putAll(userDefinedHeaders);
}
if (fixedHeaderParameters != null) {
Map<String, String> fixedHeaders = new Gson().fromJson(fixedHeaderParameters, new TypeToken<Map<String, String>>() {
}.getType());
headerMap.putAll(fixedHeaders);
}
if (StringUtils.isNotBlank(token)) {
headerMap.put(HttpConstant.TOKEN, token);
}
return headerMap;
}
private String renderQueryParametersToUrl(String url, String queryParameters, String fixedQueryParameters) throws UnsupportedEncodingException {
Map<String, Object> queryParametersMap = new HashMap<>();
if (queryParameters != null) {
queryParametersMap.putAll(new Gson().fromJson(queryParameters, Map.class));
}
if (fixedQueryParameters != null) {
queryParametersMap.putAll(new Gson().fromJson(fixedQueryParameters, Map.class));
}
StringBuilder queryParametersStringBuilder = new StringBuilder();
for (Map.Entry<String, Object> entry : queryParametersMap.entrySet()) {
if (entry.getValue() instanceof JSONObject) {
queryParametersStringBuilder.append(URLEncoder.encode(entry.getKey(), "UTF-8")).append("=").append(URLEncoder.encode(((JSONObject) entry.getValue()).toJSONString(), "UTF-8")).append("&");
} else {
queryParametersStringBuilder.append(URLEncoder.encode(entry.getKey(), "UTF-8")).append("=").append(URLEncoder.encode((String) entry.getValue(), "UTF-8")).append("&");
}
}
String path = queryParametersStringBuilder.toString();
if (StringUtils.isNotBlank(path) && StringUtils.isNotBlank(url)) {
if (url.contains("?")) {
return url + "&" + path.substring(0, path.length() - 1);
}
return url + "?" + path.substring(0, path.length() - 1);
}
return url;
}
@Override
public void validate(KeyValue config) {
if (CheckUtils.checkNull(config.getString(HttpConstant.URL)) ||
CheckUtils.checkNull(config.getString(HttpConstant.METHOD))) {
throw new RuntimeException("http required parameter is null !");
}
}
@Override
public void start(KeyValue config) {
// http
url = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.URL));
method = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.METHOD));
headerParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.HEADER_PARAMETERS));
fixedHeaderParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.FIXED_HEADER_PARAMETERS));
queryParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.QUERY_PARAMETERS));
fixedQueryParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.FIXED_QUERY_PARAMETERS));
body = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BODY));
socks5Endpoint = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.SOCKS5_ENDPOINT));
socks5UserName = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.SOCKS5_USERNAME));
socks5Password = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.SOCKS5_PASSWORD));
timeout = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.TIMEOUT));
concurrency = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.CONCURRENCY));
authType = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.AUTH_TYPE));
basicUsername = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BASIC_USERNAME));
basicPassword = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.BASIC_PASSWORD));
apiKeyUsername = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.API_KEY_USERNAME));
apiKeyPassword = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.API_KEY_PASSWORD));
oAuthEndpoint = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH_ENDPOINT));
oAuthHttpMethod = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH_HTTP_METHOD));
oAuthClientId = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH_CLIENT_ID));
oAuthClientSecret = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH_CLIENT_SECRET));
oAuthHeaderParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH_HEADER_PARAMETERS));
oAuthQueryParameters = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH_QUERY_PARAMETERS));
oAuthBody = CheckUtils.checkNullReturnDefault(config.getString(HttpConstant.OAUTH_BODY));
token = config.getString(HttpConstant.TOKEN);
try {
httpClient = new ApacheHttpClientImpl(StringUtils.isNotBlank(concurrency) ? Integer.parseInt(concurrency) : DEFAULT_CONCURRENCY);
ProxyConfig proxyConfig = new ProxyConfig();
proxyConfig.setSocks5Endpoint(socks5Endpoint);
proxyConfig.setSocks5UserName(socks5UserName);
proxyConfig.setSocks5Password(socks5Password);
httpClient.init(proxyConfig);
initAuth(httpClient);
awaitTimeoutSeconds = Long.min(StringUtils.isNotBlank(timeout) ? Long.parseLong(timeout) + 5 : DEFAULT_CONSUMER_TIMEOUT_SECONDS, DEFAULT_CONSUMER_TIMEOUT_SECONDS);
} catch (Exception e) {
log.error("HttpSinkTask | start | error => ", e);
}
log.info("HttpSinkTask | start | success => config : {}", new Gson().toJson(config));
}
@Override
public void stop() {
httpClient.close();
if (scheduledExecutorService != null) {
this.scheduledExecutorService.shutdownNow();
this.scheduledExecutorService = null;
}
}
private void initAuth(AbstractHttpClient httpClient) throws Exception {
httpAuthParameters = initAuthParameters();
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
scheduledExecutorService = null;
}
if (httpAuthParameters == null || StringUtils.isBlank(httpAuthParameters.getAuthType())) {
this.auth = null;
return;
}
if (httpAuthParameters.getAuthType().equals(AuthTypeEnum.BASIC.getAuthType())) {
this.auth = new BasicAuthImpl(httpAuthParameters);
} else if (httpAuthParameters.getAuthType().equals(AuthTypeEnum.API_KEY.getAuthType())) {
this.auth = new ApiKeyAuthImpl(httpAuthParameters);
} else if (httpAuthParameters.getAuthType().equals(AuthTypeEnum.OAUTH_CLIENT_CREDENTIALS.getAuthType())) {
if (httpClient == null) {
log.error("HttpSinkTask | initOrUpdateAuth | httpClient is null !");
throw new RuntimeException("httpClient is null ! init httpClient before initOrUpdateAuth");
}
OAuthClientImpl oAuthClient = new OAuthClientImpl(httpAuthParameters);
oAuthClient.setHttpClient(httpClient);
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleWithFixedDelay(oAuthClient, DEFAULT_OAUTH_DELAY_SECONDS, DEFAULT_OAUTH_DELAY_SECONDS, TimeUnit.SECONDS);
this.auth = oAuthClient;
} else {
this.auth = null;
}
}
private HttpAuthParameters initAuthParameters() {
HttpBasicAuthParameters httpBasicAuthParameters = new HttpBasicAuthParameters(basicUsername, basicPassword);
HttpApiKeyAuthParameters httpApiKeyAuthParameters = new HttpApiKeyAuthParameters(apiKeyUsername, apiKeyPassword);
HttpOAuthParameters httpOAuthParameters = new HttpOAuthParameters(oAuthEndpoint, oAuthHttpMethod, oAuthClientId, oAuthClientSecret, oAuthHeaderParameters, oAuthQueryParameters, oAuthBody, DEFAULT_REQUEST_TIMEOUT_MILL_SECONDS);
return new HttpAuthParameters(authType, httpBasicAuthParameters, httpApiKeyAuthParameters, httpOAuthParameters);
}
/*
* update proxy
* */
protected void updateProxy(ProxyConfig proxyConfig) {
this.httpClient.updateProxyConfig(proxyConfig);
}
protected void updateAuth(String authType,
String basicUsername, String basicPassword,
String apiKeyUsername, String apiKeyPassword,
String oAuthEndpoint, String oAuthHttpMethod, String oAuthClientId, String oAuthClientSecret, String oAuthHeaderParameters, String oAuthQueryParameters, String oAuthBody) {
if (AuthTypeEnum.parse(authType) == null) {
throw new RuntimeException("authType is null !");
}
this.authType = authType;
this.basicUsername = basicUsername;
this.basicPassword = basicPassword;
this.apiKeyUsername = apiKeyUsername;
this.apiKeyPassword = apiKeyPassword;
this.oAuthEndpoint = oAuthEndpoint;
this.oAuthHttpMethod = oAuthHttpMethod;
this.oAuthClientId = oAuthClientId;
this.oAuthClientSecret = oAuthClientSecret;
this.oAuthHeaderParameters = oAuthHeaderParameters;
this.oAuthQueryParameters = oAuthQueryParameters;
this.oAuthBody = oAuthBody;
try {
initAuth(httpClient);
} catch (Exception e) {
log.error("HttpSinkTask | updateAuth | error => ", e);
}
}
protected Auth getAuth() {
return auth;
}
protected AbstractHttpClient getHttpClient() {
return httpClient;
}
protected Map<String, String> getCustomHeaders() throws Exception {
return renderHeaderMap(headerParameters, fixedHeaderParameters, token);
}
protected String getCustomUrlWithQueryParameters() throws Exception {
return renderQueryParametersToUrl(url, queryParameters, fixedQueryParameters);
}
protected String getCustomBody(ConnectRecord connectRecord) throws Exception {
return body;
}
}