Implemented connection routing for the async request exec chain
git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk@1794170 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/external/HttpAsyncClientCompatibilityTest.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/external/HttpAsyncClientCompatibilityTest.java
index e65defe..ffdb17a 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/external/HttpAsyncClientCompatibilityTest.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/external/HttpAsyncClientCompatibilityTest.java
@@ -29,17 +29,15 @@
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
-import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.Credentials;
+import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
@@ -48,6 +46,8 @@
import org.apache.hc.client5.http.impl.sync.BasicCredentialsProvider;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
+import org.apache.hc.core5.http.HeaderElements;
+import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpStatus;
@@ -55,19 +55,39 @@
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.TextUtils;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
public class HttpAsyncClientCompatibilityTest {
public static void main(final String... args) throws Exception {
final HttpAsyncClientCompatibilityTest[] tests = new HttpAsyncClientCompatibilityTest[] {
new HttpAsyncClientCompatibilityTest(
- HttpVersion.HTTP_1_1, new HttpHost("localhost", 8080, "http"), null, null),
+ HttpVersion.HTTP_1_1,
+ new HttpHost("localhost", 8080, "http"), null, null),
new HttpAsyncClientCompatibilityTest(
- HttpVersion.HTTP_2_0, new HttpHost("localhost", 8080, "http"), null, null),
+ HttpVersion.HTTP_1_1,
+ new HttpHost("test-httpd", 8080, "http"), new HttpHost("localhost", 8888), null),
new HttpAsyncClientCompatibilityTest(
- HttpVersion.HTTP_1_1, new HttpHost("localhost", 8443, "https"), null, null),
+ HttpVersion.HTTP_1_1,
+ new HttpHost("test-httpd", 8080, "http"), new HttpHost("localhost", 8889),
+ new UsernamePasswordCredentials("squid", "nopassword".toCharArray())),
new HttpAsyncClientCompatibilityTest(
- HttpVersion.HTTP_2_0, new HttpHost("localhost", 8443, "https"), null, null)
+ HttpVersion.HTTP_1_1,
+ new HttpHost("localhost", 8443, "https"), null, null),
+ new HttpAsyncClientCompatibilityTest(
+ HttpVersion.HTTP_1_1,
+ new HttpHost("test-httpd", 8443, "https"), new HttpHost("localhost", 8888), null),
+ new HttpAsyncClientCompatibilityTest(
+ HttpVersion.HTTP_1_1,
+ new HttpHost("test-httpd", 8443, "https"), new HttpHost("localhost", 8889),
+ new UsernamePasswordCredentials("squid", "nopassword".toCharArray()))
+// new HttpAsyncClientCompatibilityTest(
+// HttpVersion.HTTP_2_0,
+// new HttpHost("localhost", 8080, "http"), null, null),
+// new HttpAsyncClientCompatibilityTest(
+// HttpVersion.HTTP_2_0,
+// new HttpHost("localhost", 8443, "https"), null, null)
};
for (final HttpAsyncClientCompatibilityTest test: tests) {
try {
@@ -78,6 +98,8 @@
}
}
+ private static final Timeout TIMEOUT = Timeout.ofSeconds(5);
+
private final HttpVersion protocolVersion;
private final HttpHost target;
private final HttpHost proxy;
@@ -108,6 +130,7 @@
this.client = HttpAsyncClients.custom()
.setVersionPolicy(this.protocolVersion == HttpVersion.HTTP_2 ? HttpVersionPolicy.FORCE_HTTP_2 : HttpVersionPolicy.FORCE_HTTP_1)
.setConnectionManager(this.connManager)
+ .setProxy(this.proxy)
.setDefaultRequestConfig(requestConfig)
.build();
}
@@ -144,10 +167,10 @@
final HttpClientContext context = HttpClientContext.create();
context.setCredentialsProvider(credentialsProvider);
- final SimpleHttpRequest options = new SimpleHttpRequest("OPTIONS", target, "*", null, null);
- final Future<SimpleHttpResponse> future = client.execute(new SimpleRequestProducer(options), new SimpleResponseConsumer(), null);
+ final SimpleHttpRequest options = SimpleHttpRequest.options(target, "*");
+ final Future<SimpleHttpResponse> future = client.execute(options, context, null);
try {
- final SimpleHttpResponse response = future.get(5, TimeUnit.SECONDS);
+ final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
final int code = response.getCode();
if (code == HttpStatus.SC_OK) {
logResult(TestResult.OK, options, Objects.toString(response.getFirstHeader("server")));
@@ -163,15 +186,16 @@
}
// Basic GET requests
{
+ connManager.closeIdle(TimeValue.NEG_ONE_MILLISECONDS);
final HttpClientContext context = HttpClientContext.create();
context.setCredentialsProvider(credentialsProvider);
final String[] requestUris = new String[] {"/", "/news.html", "/status.html"};
for (String requestUri: requestUris) {
- final SimpleHttpRequest httpGet = new SimpleHttpRequest("GET", target, requestUri, null, null);
- final Future<SimpleHttpResponse> future = client.execute(new SimpleRequestProducer(httpGet), new SimpleResponseConsumer(), null);
+ final SimpleHttpRequest httpGet = SimpleHttpRequest.get(target, requestUri);
+ final Future<SimpleHttpResponse> future = client.execute(httpGet, context, null);
try {
- final SimpleHttpResponse response = future.get(5, TimeUnit.SECONDS);
+ final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
final int code = response.getCode();
if (code == HttpStatus.SC_OK) {
logResult(TestResult.OK, httpGet, "200");
@@ -186,6 +210,112 @@
}
}
}
+ // Wrong target auth scope
+ {
+ connManager.closeIdle(TimeValue.NEG_ONE_MILLISECONDS);
+ credentialsProvider.setCredentials(
+ new AuthScope("otherhost", AuthScope.ANY_PORT, "Restricted Files"),
+ new UsernamePasswordCredentials("testuser", "nopassword".toCharArray()));
+ final HttpClientContext context = HttpClientContext.create();
+ context.setCredentialsProvider(credentialsProvider);
+
+ final SimpleHttpRequest httpGetSecret = SimpleHttpRequest.get(target, "/private/big-secret.txt");
+ final Future<SimpleHttpResponse> future = client.execute(httpGetSecret, context, null);
+ try {
+ final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ final int code = response.getCode();
+ if (code == HttpStatus.SC_UNAUTHORIZED) {
+ logResult(TestResult.OK, httpGetSecret, "401 (wrong target auth scope)");
+ } else {
+ logResult(TestResult.NOK, httpGetSecret, "(status " + code + ")");
+ }
+ } catch (ExecutionException ex) {
+ final Throwable cause = ex.getCause();
+ logResult(TestResult.NOK, httpGetSecret, "(" + cause.getMessage() + ")");
+ } catch (TimeoutException ex) {
+ logResult(TestResult.NOK, httpGetSecret, "(time out)");
+ }
+ }
+ // Wrong target credentials
+ {
+ connManager.closeIdle(TimeValue.NEG_ONE_MILLISECONDS);
+ credentialsProvider.setCredentials(
+ new AuthScope(target),
+ new UsernamePasswordCredentials("testuser", "wrong password".toCharArray()));
+ final HttpClientContext context = HttpClientContext.create();
+ context.setCredentialsProvider(credentialsProvider);
+
+ final SimpleHttpRequest httpGetSecret = SimpleHttpRequest.get(target, "/private/big-secret.txt");
+ final Future<SimpleHttpResponse> future = client.execute(httpGetSecret, context, null);
+ try {
+ final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ final int code = response.getCode();
+ if (code == HttpStatus.SC_UNAUTHORIZED) {
+ logResult(TestResult.OK, httpGetSecret, "401 (wrong target creds)");
+ } else {
+ logResult(TestResult.NOK, httpGetSecret, "(status " + code + ")");
+ }
+ } catch (ExecutionException ex) {
+ final Throwable cause = ex.getCause();
+ logResult(TestResult.NOK, httpGetSecret, "(" + cause.getMessage() + ")");
+ } catch (TimeoutException ex) {
+ logResult(TestResult.NOK, httpGetSecret, "(time out)");
+ }
+ }
+ // Correct target credentials
+ {
+ connManager.closeIdle(TimeValue.NEG_ONE_MILLISECONDS);
+ credentialsProvider.setCredentials(
+ new AuthScope(target),
+ new UsernamePasswordCredentials("testuser", "nopassword".toCharArray()));
+ final HttpClientContext context = HttpClientContext.create();
+ context.setCredentialsProvider(credentialsProvider);
+
+ final SimpleHttpRequest httpGetSecret = SimpleHttpRequest.get(target, "/private/big-secret.txt");
+ final Future<SimpleHttpResponse> future = client.execute(httpGetSecret, context, null);
+ try {
+ final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ final int code = response.getCode();
+ if (code == HttpStatus.SC_OK) {
+ logResult(TestResult.OK, httpGetSecret, "200 (correct target creds)");
+ } else {
+ logResult(TestResult.NOK, httpGetSecret, "(status " + code + ")");
+ }
+ } catch (ExecutionException ex) {
+ final Throwable cause = ex.getCause();
+ logResult(TestResult.NOK, httpGetSecret, "(" + cause.getMessage() + ")");
+ } catch (TimeoutException ex) {
+ logResult(TestResult.NOK, httpGetSecret, "(time out)");
+ }
+ }
+ // Correct target credentials (no keep-alive)
+ if (protocolVersion.lessEquals(HttpVersion.HTTP_1_1))
+ {
+ connManager.closeIdle(TimeValue.NEG_ONE_MILLISECONDS);
+ credentialsProvider.setCredentials(
+ new AuthScope(target),
+ new UsernamePasswordCredentials("testuser", "nopassword".toCharArray()));
+ final HttpClientContext context = HttpClientContext.create();
+ context.setCredentialsProvider(credentialsProvider);
+
+ final SimpleHttpRequest httpGetSecret = SimpleHttpRequest.get(target, "/private/big-secret.txt");
+ httpGetSecret.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
+ final Future<SimpleHttpResponse> future = client.execute(httpGetSecret, context, null);
+ try {
+ final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+ final int code = response.getCode();
+ if (code == HttpStatus.SC_OK) {
+ logResult(TestResult.OK, httpGetSecret, "200 (correct target creds / no keep-alive)");
+ } else {
+ logResult(TestResult.NOK, httpGetSecret, "(status " + code + ")");
+ }
+ } catch (ExecutionException ex) {
+ final Throwable cause = ex.getCause();
+ logResult(TestResult.NOK, httpGetSecret, "(" + cause.getMessage() + ")");
+ } catch (TimeoutException ex) {
+ logResult(TestResult.NOK, httpGetSecret, "(time out)");
+ }
+ }
}
}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java
new file mode 100644
index 0000000..c2dea36
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java
@@ -0,0 +1,394 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.RouteTracker;
+import org.apache.hc.client5.http.async.AsyncExecCallback;
+import org.apache.hc.client5.http.async.AsyncExecChain;
+import org.apache.hc.client5.http.async.AsyncExecChainHandler;
+import org.apache.hc.client5.http.async.AsyncExecRuntime;
+import org.apache.hc.client5.http.auth.AuthExchange;
+import org.apache.hc.client5.http.auth.ChallengeType;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.auth.HttpAuthenticator;
+import org.apache.hc.client5.http.impl.routing.BasicRouteDirector;
+import org.apache.hc.client5.http.impl.sync.TunnelRefusedException;
+import org.apache.hc.client5.http.protocol.AuthenticationStrategy;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.client5.http.routing.HttpRouteDirector;
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.message.BasicHttpRequest;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.util.Args;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Request executor in the HTTP request execution chain
+ * that is responsible for establishing connection to the target
+ * origin server as specified by the current route.
+ *
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
+public final class AsyncConnectExec implements AsyncExecChainHandler {
+
+ private final Logger log = LogManager.getLogger(getClass());
+
+ private final HttpProcessor proxyHttpProcessor;
+ private final AuthenticationStrategy proxyAuthStrategy;
+ private final HttpAuthenticator authenticator;
+ private final HttpRouteDirector routeDirector;
+
+ public AsyncConnectExec(
+ final HttpProcessor proxyHttpProcessor,
+ final AuthenticationStrategy proxyAuthStrategy) {
+ Args.notNull(proxyHttpProcessor, "Proxy HTTP processor");
+ Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
+ this.proxyHttpProcessor = proxyHttpProcessor;
+ this.proxyAuthStrategy = proxyAuthStrategy;
+ this.authenticator = new HttpAuthenticator();
+ this.routeDirector = new BasicRouteDirector();
+ }
+
+ static class State {
+
+ State(final HttpRoute route) {
+ tracker = new RouteTracker(route);
+ }
+
+ final RouteTracker tracker;
+
+ volatile boolean challenged;
+ volatile boolean tunnelRefused;
+
+ }
+
+ @Override
+ public void execute(
+ final HttpRequest request,
+ final AsyncEntityProducer entityProducer,
+ final AsyncExecChain.Scope scope,
+ final AsyncExecChain chain,
+ final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
+ Args.notNull(request, "HTTP request");
+ Args.notNull(scope, "Scope");
+
+ final String exchangeId = scope.exchangeId;
+ final HttpRoute route = scope.route;
+ final HttpClientContext clientContext = scope.clientContext;
+ final AsyncExecRuntime execRuntime = scope.execRuntime;
+ final State state = new State(route);
+
+ final Runnable routeInitiation = new Runnable() {
+
+ @Override
+ public void run() {
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": connection acquired");
+ }
+ if (execRuntime.isConnected()) {
+ try {
+ chain.proceed(request, entityProducer, scope, asyncExecCallback);
+ } catch (final HttpException | IOException ex) {
+ asyncExecCallback.failed(ex);
+ }
+ } else {
+ proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
+ }
+ }
+
+ };
+
+ if (!execRuntime.isConnectionAcquired()) {
+ final Object userToken = clientContext.getUserToken();
+ if (log.isDebugEnabled()) {
+ log.debug(exchangeId + ": acquiring connection with route " + route);
+ }
+ execRuntime.acquireConnection(route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
+
+ @Override
+ public void completed(final AsyncExecRuntime execRuntime) {
+ routeInitiation.run();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ asyncExecCallback.failed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ asyncExecCallback.failed(new InterruptedIOException());
+ }
+
+ });
+ } else {
+ routeInitiation.run();
+ }
+
+ }
+
+ private void proceedToNextHop(
+ final State state,
+ final HttpRequest request,
+ final AsyncEntityProducer entityProducer,
+ final AsyncExecChain.Scope scope,
+ final AsyncExecChain chain,
+ final AsyncExecCallback asyncExecCallback) {
+ final RouteTracker tracker = state.tracker;
+ final AsyncExecRuntime execRuntime = scope.execRuntime;
+ final HttpRoute route = scope.route;
+ final HttpClientContext clientContext = scope.clientContext;
+
+ int step;
+ do {
+ final HttpRoute fact = tracker.toRoute();
+ step = routeDirector.nextStep(route, fact);
+ switch (step) {
+ case HttpRouteDirector.CONNECT_TARGET:
+ execRuntime.connect(clientContext, new FutureCallback<AsyncExecRuntime>() {
+
+ @Override
+ public void completed(final AsyncExecRuntime execRuntime) {
+ tracker.connectTarget(route.isSecure());
+ log.debug("Connected to target");
+ proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ asyncExecCallback.failed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ asyncExecCallback.failed(new InterruptedIOException());
+ }
+
+ });
+ return;
+
+ case HttpRouteDirector.CONNECT_PROXY:
+ execRuntime.connect(clientContext, new FutureCallback<AsyncExecRuntime>() {
+
+ @Override
+ public void completed(final AsyncExecRuntime execRuntime) {
+ final HttpHost proxy = route.getProxyHost();
+ tracker.connectProxy(proxy, false);
+ log.debug("Connected to proxy");
+ proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ asyncExecCallback.failed(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ asyncExecCallback.failed(new InterruptedIOException());
+ }
+
+ });
+ return;
+
+ case HttpRouteDirector.TUNNEL_TARGET:
+ try {
+ final HttpHost proxy = route.getProxyHost();
+ final HttpHost target = route.getTargetHost();
+ createTunnel(state, proxy ,target, scope, chain, new AsyncExecCallback() {
+
+ @Override
+ public AsyncDataConsumer handleResponse(
+ final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
+ return asyncExecCallback.handleResponse(response, entityDetails);
+ }
+
+ @Override
+ public void completed() {
+ log.debug("Tunnel to target created");
+ tracker.tunnelTarget(false);
+ proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ asyncExecCallback.failed(cause);
+ }
+
+ });
+ } catch (final HttpException | IOException ex) {
+ asyncExecCallback.failed(ex);
+ }
+ return;
+
+ case HttpRouteDirector.TUNNEL_PROXY:
+ // The most simple example for this case is a proxy chain
+ // of two proxies, where P1 must be tunnelled to P2.
+ // route: Source -> P1 -> P2 -> Target (3 hops)
+ // fact: Source -> P1 -> Target (2 hops)
+ asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
+ return;
+
+ case HttpRouteDirector.LAYER_PROTOCOL:
+ execRuntime.upgradeTls(clientContext);
+ log.debug("Upgraded to TLS");
+ tracker.layerProtocol(route.isSecure());
+ break;
+
+ case HttpRouteDirector.UNREACHABLE:
+ asyncExecCallback.failed(new HttpException("Unable to establish route: " +
+ "planned = " + route + "; current = " + fact));
+ return;
+
+ case HttpRouteDirector.COMPLETE:
+ log.debug("Route fully established");
+ try {
+ chain.proceed(request, entityProducer, scope, asyncExecCallback);
+ } catch (final HttpException | IOException ex) {
+ asyncExecCallback.failed(ex);
+ }
+ break;
+
+ default:
+ throw new IllegalStateException("Unknown step indicator " + step + " from RouteDirector.");
+ }
+ } while (step > HttpRouteDirector.COMPLETE);
+ }
+
+ private void createTunnel(
+ final State state,
+ final HttpHost proxy,
+ final HttpHost nextHop,
+ final AsyncExecChain.Scope scope,
+ final AsyncExecChain chain,
+ final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
+
+ final AsyncExecRuntime execRuntime = scope.execRuntime;
+ final HttpClientContext clientContext = scope.clientContext;
+
+ final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange();
+
+ final HttpRequest connect = new BasicHttpRequest("CONNECT", nextHop, nextHop.toHostString());
+ connect.setVersion(HttpVersion.HTTP_1_1);
+
+ proxyHttpProcessor.process(connect, null, clientContext);
+ authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
+
+ chain.proceed(connect, null, scope, new AsyncExecCallback() {
+
+ @Override
+ public AsyncDataConsumer handleResponse(
+ final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
+
+ clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
+ proxyHttpProcessor.process(response, entityDetails, clientContext);
+
+ final int status = response.getCode();
+ if (status < HttpStatus.SC_SUCCESS) {
+ throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
+ }
+
+ if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
+ state.challenged = true;
+ return null;
+ } else {
+ state.challenged = false;
+ if (status >= HttpStatus.SC_REDIRECTION) {
+ state.tunnelRefused = true;
+ return asyncExecCallback.handleResponse(response, entityDetails);
+ } else {
+ return null;
+ }
+ }
+ }
+
+ @Override
+ public void completed() {
+ if (!execRuntime.isConnected()) {
+ state.tracker.reset();
+ }
+ if (state.challenged) {
+ try {
+ createTunnel(state, proxy, nextHop, scope, chain, asyncExecCallback);
+ } catch (final HttpException | IOException ex) {
+ asyncExecCallback.failed(ex);
+ }
+ } else {
+ if (state.tunnelRefused) {
+ asyncExecCallback.failed(new TunnelRefusedException("Tunnel refused", null));
+ } else {
+ asyncExecCallback.completed();
+ }
+ }
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ asyncExecCallback.failed(cause);
+ }
+
+ });
+
+ }
+
+ private boolean needAuthentication(
+ final AuthExchange proxyAuthExchange,
+ final HttpHost proxy,
+ final HttpResponse response,
+ final HttpClientContext context) {
+ final RequestConfig config = context.getRequestConfig();
+ if (config.isAuthenticationEnabled()) {
+ final boolean proxyAuthRequested = authenticator.isChallenged(proxy, ChallengeType.PROXY, response, proxyAuthExchange, context);
+ if (proxyAuthRequested) {
+ return authenticator.prepareAuthResponse(proxy, ChallengeType.PROXY, response,
+ proxyAuthStrategy, proxyAuthExchange, context);
+ }
+ }
+ return false;
+ }
+
+}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncExecRuntimeImpl.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncExecRuntimeImpl.java
index b2a0785..8c90c35 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncExecRuntimeImpl.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncExecRuntimeImpl.java
@@ -227,7 +227,7 @@
@Override
public void upgradeTls(final HttpClientContext context) {
final AsyncConnectionEndpoint endpoint = ensureValid();
- manager.upgrade(endpoint, context);
+ manager.upgrade(endpoint, versionPolicy, context);
}
@Override
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncMainClientExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncMainClientExec.java
index ccd464e..f015d62 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncMainClientExec.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncMainClientExec.java
@@ -40,7 +40,6 @@
import org.apache.hc.client5.http.async.AsyncExecRuntime;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.protocol.UserTokenHandler;
-import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
@@ -80,45 +79,6 @@
final HttpRoute route = scope.route;
final HttpClientContext clientContext = scope.clientContext;
final AsyncExecRuntime execRuntime = scope.execRuntime;
- if (!execRuntime.isConnectionAcquired()) {
- final Object userToken = clientContext.getUserToken();
- if (log.isDebugEnabled()) {
- log.debug(exchangeId + ": acquiring connection with route " + route);
- }
- execRuntime.acquireConnection(route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
-
- @Override
- public void completed(final AsyncExecRuntime execRuntime) {
- if (log.isDebugEnabled()) {
- log.debug(exchangeId + ": connection acquired");
- }
- execute(exchangeId, route, execRuntime, request, entityProducer, clientContext, asyncExecCallback);
- }
-
- @Override
- public void failed(final Exception ex) {
- asyncExecCallback.failed(ex);
- }
-
- @Override
- public void cancelled() {
- asyncExecCallback.failed(new InterruptedIOException());
- }
-
- });
- } else {
- execute(exchangeId, route, execRuntime, request, entityProducer, clientContext, asyncExecCallback);
- }
- }
-
- private void execute(
- final String exchangeId,
- final HttpRoute route,
- final AsyncExecRuntime execRuntime,
- final HttpRequest request,
- final AsyncEntityProducer entityProducer,
- final HttpClientContext clientContext,
- final AsyncExecCallback asyncExecCallback) {
if (log.isDebugEnabled()) {
log.debug(exchangeId + ": executing " + new RequestLine(request));
@@ -186,6 +146,7 @@
} else {
execRuntime.validateConnection();
}
+ asyncExecCallback.completed();
}
}
@@ -227,6 +188,7 @@
}
};
+
execRuntime.execute(
log.isDebugEnabled() ? new LoggingAsyncClientExchangeHandler(log, exchangeId, internalExchangeHandler) : internalExchangeHandler,
clientContext);
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncProtocolExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncProtocolExec.java
index fb3ba8f..bf20005 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncProtocolExec.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncProtocolExec.java
@@ -191,7 +191,7 @@
@Override
public void completed() {
- if (execRuntime.isConnected()) {
+ if (!execRuntime.isConnected()) {
if (proxyAuthExchange.getState() == AuthExchange.State.SUCCESS
&& proxyAuthExchange.getAuthScheme() != null
&& proxyAuthExchange.getAuthScheme().isConnectionBased()) {
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java
index 07699a7..61351d7 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java
@@ -100,9 +100,11 @@
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
+import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http.protocol.HttpProcessorBuilder;
+import org.apache.hc.core5.http.protocol.RequestTargetHost;
import org.apache.hc.core5.http.protocol.RequestUserAgent;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.config.H2Config;
@@ -735,6 +737,12 @@
}
}
+ execChainDefinition.addFirst(
+ new AsyncConnectExec(
+ new DefaultHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)),
+ proxyAuthStrategyCopy),
+ ChainElements.CONNECT.name());
+
final HttpProcessorBuilder b = HttpProcessorBuilder.create();
if (requestInterceptors != null) {
for (final RequestInterceptorEntry entry: requestInterceptors) {
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/AsyncClientConnectionOperator.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/AsyncClientConnectionOperator.java
index 2a6b148..b2b764a 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/AsyncClientConnectionOperator.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/AsyncClientConnectionOperator.java
@@ -157,15 +157,15 @@
return future;
}
- public void upgrade(final ManagedAsyncClientConnection connection, final HttpHost host) {
- final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(host.getHostName()) : null;
+ public void upgrade(final ManagedAsyncClientConnection connection, final HttpHost host, final Object attachment) {
+ final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(host.getSchemeName()) : null;
if (tlsStrategy != null) {
tlsStrategy.upgrade(
connection,
host,
connection.getLocalAddress(),
connection.getRemoteAddress(),
- null);
+ attachment);
}
}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
index 328c625..e7039ea 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
@@ -305,13 +305,14 @@
@Override
public void upgrade(
final AsyncConnectionEndpoint endpoint,
+ final Object attachment,
final HttpContext context) {
Args.notNull(endpoint, "Managed endpoint");
final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
final HttpRoute route = poolEntry.getRoute();
final ManagedAsyncClientConnection connection = poolEntry.getConnection();
- connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost());
+ connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost(), attachment);
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(internalEndpoint) + ": upgraded " + ConnPoolSupport.getId(connection));
}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ConnectExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ConnectExec.java
index af8daf7..9836ee2 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ConnectExec.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ConnectExec.java
@@ -54,7 +54,6 @@
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.HttpVersion;
-import org.apache.hc.core5.http.io.entity.BufferedHttpEntity;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicClassicHttpRequest;
import org.apache.hc.core5.http.message.StatusLine;
@@ -237,13 +236,9 @@
// Buffer response content
final HttpEntity entity = response.getEntity();
- if (entity != null) {
- response.setEntity(new BufferedHttpEntity(entity));
- }
-
+ final String responseMessage = entity != null ? EntityUtils.toString(entity) : null;
execRuntime.disconnect();
- throw new TunnelRefusedException("CONNECT refused by proxy: " +
- new StatusLine(response), response);
+ throw new TunnelRefusedException("CONNECT refused by proxy: " + new StatusLine(response), responseMessage);
}
// How to decide on security of the tunnelled connection?
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ProxyClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ProxyClient.java
index 798d9ad..c7d2e63 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ProxyClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ProxyClient.java
@@ -67,9 +67,9 @@
import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
import org.apache.hc.core5.http.io.HttpConnectionFactory;
-import org.apache.hc.core5.http.io.entity.BufferedHttpEntity;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicClassicHttpRequest;
+import org.apache.hc.core5.http.message.StatusLine;
import org.apache.hc.core5.http.protocol.BasicHttpContext;
import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
import org.apache.hc.core5.http.protocol.HttpContext;
@@ -207,12 +207,9 @@
// Buffer response content
final HttpEntity entity = response.getEntity();
- if (entity != null) {
- response.setEntity(new BufferedHttpEntity(entity));
- }
-
+ final String responseMessage = entity != null ? EntityUtils.toString(entity) : null;
conn.close();
- throw new TunnelRefusedException("CONNECT refused by proxy: " + response, response);
+ throw new TunnelRefusedException("CONNECT refused by proxy: " + new StatusLine(response), responseMessage);
}
return conn.getSocket();
}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/TunnelRefusedException.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/TunnelRefusedException.java
index a187f76..fe78cd7 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/TunnelRefusedException.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/TunnelRefusedException.java
@@ -27,7 +27,6 @@
package org.apache.hc.client5.http.impl.sync;
-import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.HttpException;
/**
@@ -39,15 +38,15 @@
private static final long serialVersionUID = -8646722842745617323L;
- private final ClassicHttpResponse response;
+ private final String responseMesage;
- public TunnelRefusedException(final String message, final ClassicHttpResponse response) {
+ public TunnelRefusedException(final String message, final String responseMesage) {
super(message);
- this.response = response;
+ this.responseMesage = responseMesage;
}
- public ClassicHttpResponse getResponse() {
- return this.response;
+ public String getResponseMessage() {
+ return this.responseMesage;
}
}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionManager.java
index 077999e..72aa376 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionManager.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionManager.java
@@ -115,10 +115,12 @@
* Upgrades the endpoint's underlying transport to Transport Layer Security.
*
* @param endpoint the managed endpoint.
+ * @param attachment the attachment the upgrade attachment object.
* @param context the actual HTTP context.
*/
void upgrade(
AsyncConnectionEndpoint endpoint,
+ Object attachment,
HttpContext context);
}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/sync/TestConnectExec.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/sync/TestConnectExec.java
index 0d9817c..8410aa7 100644
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/sync/TestConnectExec.java
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/sync/TestConnectExec.java
@@ -54,7 +54,6 @@
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpVersion;
-import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.http.message.BasicClassicHttpResponse;
import org.apache.hc.core5.http.protocol.HttpProcessor;
@@ -219,8 +218,7 @@
try {
exec.execute(request, scope, execChain);
} catch (final TunnelRefusedException ex) {
- final ClassicHttpResponse r = ex.getResponse();
- Assert.assertEquals("Ka-boom", EntityUtils.toString(r.getEntity()));
+ Assert.assertEquals("Ka-boom", ex.getResponseMessage());
Mockito.verify(execRuntime).disconnect();
Mockito.verify(execRuntime).discardConnection();
throw ex;