blob: 309bbfc352ae6e6d01e3818f45120fb74087b635 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.http.auth;
import com.google.common.base.Strings;
import lombok.SneakyThrows;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.concurrent.Callable;
public class HttpRequestCallable implements Callable<String> {
private static final Logger log = LoggerFactory.getLogger(HttpCallback.class);
private CloseableHttpClient httpclient;
private HttpCallback httpCallback;
private HttpUriRequest httpUriRequest;
private HttpClientContext context;
private SocksProxyConfig socksProxyConfig;
private static final String SOCKS_ADDRESS_KEY = "socks.address";
public HttpRequestCallable(CloseableHttpClient httpclient, HttpUriRequest httpUriRequest, HttpClientContext context,
SocksProxyConfig socksProxyConfig, HttpCallback httpCallback) {
this.httpclient = httpclient;
this.httpUriRequest = httpUriRequest;
this.context = context;
this.socksProxyConfig = socksProxyConfig;
this.httpCallback = httpCallback;
}
public void loadSocks5ProxyConfig() {
if (!Strings.isNullOrEmpty(socksProxyConfig.getSocks5Endpoint())) {
String[] socksAddrAndPor = socksProxyConfig.getSocks5Endpoint()
.split(":");
InetSocketAddress socksaddr = new InetSocketAddress(socksAddrAndPor[0],
Integer.parseInt(socksAddrAndPor[1]));
context.setAttribute(SOCKS_ADDRESS_KEY, socksaddr);
ThreadLocalProxyAuthenticator.getInstance()
.setCredentials(socksProxyConfig.getSocks5UserName(), socksProxyConfig.getSocks5Password());
}
}
@SneakyThrows
@Override
public String call() throws Exception {
CloseableHttpResponse response = null;
try {
Long startTime = System.currentTimeMillis();
loadSocks5ProxyConfig();
response = httpclient.execute(httpUriRequest, context);
if (response != null && response.getEntity() != null) {
String result = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
if (response.getStatusLine()
.getStatusCode() / 100 != 2) {
String msg = MessageFormat.format("Http Status:{0},Msg:{1}", response.getStatusLine()
.getStatusCode(), result);
httpCallback.setMsg(msg);
httpCallback.setFailed(Boolean.TRUE);
}
log.info("The cost of one http request:{}, Connection Connection={}, Keep-Alive={}",
System.currentTimeMillis() - startTime, response.getHeaders("Connection"),
response.getHeaders("Keep-Alive"));
return result;
}
} catch (Throwable e) {
log.error("http execute failed.", e);
httpCallback.setFailed(Boolean.TRUE);
httpCallback.setMsg(e.getLocalizedMessage());
} finally {
httpCallback.getCountDownLatch()
.countDown();
if (null != response.getEntity()) {
EntityUtils.consume(response.getEntity());
}
}
return null;
}
}