Implemented connection routing for the async request exec chain

git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk@1794170 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/external/HttpAsyncClientCompatibilityTest.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/external/HttpAsyncClientCompatibilityTest.java
index e65defe..ffdb17a 100644
--- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/external/HttpAsyncClientCompatibilityTest.java
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/external/HttpAsyncClientCompatibilityTest.java
@@ -29,17 +29,15 @@
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import javax.net.ssl.SSLContext;
 
 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
-import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
 import org.apache.hc.client5.http.auth.AuthScope;
 import org.apache.hc.client5.http.auth.Credentials;
+import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
 import org.apache.hc.client5.http.config.RequestConfig;
 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
 import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
@@ -48,6 +46,8 @@
 import org.apache.hc.client5.http.impl.sync.BasicCredentialsProvider;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
 import org.apache.hc.client5.http.ssl.H2TlsStrategy;
+import org.apache.hc.core5.http.HeaderElements;
+import org.apache.hc.core5.http.HttpHeaders;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpStatus;
@@ -55,19 +55,39 @@
 import org.apache.hc.core5.http2.HttpVersionPolicy;
 import org.apache.hc.core5.ssl.SSLContexts;
 import org.apache.hc.core5.util.TextUtils;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
 
 public class HttpAsyncClientCompatibilityTest {
 
     public static void main(final String... args) throws Exception {
         final HttpAsyncClientCompatibilityTest[] tests = new HttpAsyncClientCompatibilityTest[] {
                 new HttpAsyncClientCompatibilityTest(
-                        HttpVersion.HTTP_1_1, new HttpHost("localhost", 8080, "http"), null, null),
+                        HttpVersion.HTTP_1_1,
+                        new HttpHost("localhost", 8080, "http"), null, null),
                 new HttpAsyncClientCompatibilityTest(
-                        HttpVersion.HTTP_2_0, new HttpHost("localhost", 8080, "http"), null, null),
+                        HttpVersion.HTTP_1_1,
+                        new HttpHost("test-httpd", 8080, "http"), new HttpHost("localhost", 8888), null),
                 new HttpAsyncClientCompatibilityTest(
-                        HttpVersion.HTTP_1_1, new HttpHost("localhost", 8443, "https"), null, null),
+                        HttpVersion.HTTP_1_1,
+                        new HttpHost("test-httpd", 8080, "http"), new HttpHost("localhost", 8889),
+                        new UsernamePasswordCredentials("squid", "nopassword".toCharArray())),
                 new HttpAsyncClientCompatibilityTest(
-                        HttpVersion.HTTP_2_0, new HttpHost("localhost", 8443, "https"), null, null)
+                        HttpVersion.HTTP_1_1,
+                        new HttpHost("localhost", 8443, "https"), null, null),
+                new HttpAsyncClientCompatibilityTest(
+                        HttpVersion.HTTP_1_1,
+                        new HttpHost("test-httpd", 8443, "https"), new HttpHost("localhost", 8888), null),
+                new HttpAsyncClientCompatibilityTest(
+                        HttpVersion.HTTP_1_1,
+                        new HttpHost("test-httpd", 8443, "https"), new HttpHost("localhost", 8889),
+                        new UsernamePasswordCredentials("squid", "nopassword".toCharArray()))
+//                new HttpAsyncClientCompatibilityTest(
+//                        HttpVersion.HTTP_2_0,
+//                        new HttpHost("localhost", 8080, "http"), null, null),
+//                new HttpAsyncClientCompatibilityTest(
+//                        HttpVersion.HTTP_2_0,
+//                        new HttpHost("localhost", 8443, "https"), null, null)
         };
         for (final HttpAsyncClientCompatibilityTest test: tests) {
             try {
@@ -78,6 +98,8 @@
         }
     }
 
+    private static final Timeout TIMEOUT = Timeout.ofSeconds(5);
+
     private final HttpVersion protocolVersion;
     private final HttpHost target;
     private final HttpHost proxy;
@@ -108,6 +130,7 @@
         this.client = HttpAsyncClients.custom()
                 .setVersionPolicy(this.protocolVersion == HttpVersion.HTTP_2 ? HttpVersionPolicy.FORCE_HTTP_2 : HttpVersionPolicy.FORCE_HTTP_1)
                 .setConnectionManager(this.connManager)
+                .setProxy(this.proxy)
                 .setDefaultRequestConfig(requestConfig)
                 .build();
     }
@@ -144,10 +167,10 @@
             final HttpClientContext context = HttpClientContext.create();
             context.setCredentialsProvider(credentialsProvider);
 
-            final SimpleHttpRequest options = new SimpleHttpRequest("OPTIONS", target, "*", null, null);
-            final Future<SimpleHttpResponse> future = client.execute(new SimpleRequestProducer(options), new SimpleResponseConsumer(), null);
+            final SimpleHttpRequest options = SimpleHttpRequest.options(target, "*");
+            final Future<SimpleHttpResponse> future = client.execute(options, context, null);
             try {
-                final SimpleHttpResponse response = future.get(5, TimeUnit.SECONDS);
+                final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
                 final int code = response.getCode();
                 if (code == HttpStatus.SC_OK) {
                     logResult(TestResult.OK, options, Objects.toString(response.getFirstHeader("server")));
@@ -163,15 +186,16 @@
         }
         // Basic GET requests
         {
+            connManager.closeIdle(TimeValue.NEG_ONE_MILLISECONDS);
             final HttpClientContext context = HttpClientContext.create();
             context.setCredentialsProvider(credentialsProvider);
 
             final String[] requestUris = new String[] {"/", "/news.html", "/status.html"};
             for (String requestUri: requestUris) {
-                final SimpleHttpRequest httpGet = new SimpleHttpRequest("GET", target, requestUri, null, null);
-                final Future<SimpleHttpResponse> future = client.execute(new SimpleRequestProducer(httpGet), new SimpleResponseConsumer(), null);
+                final SimpleHttpRequest httpGet = SimpleHttpRequest.get(target, requestUri);
+                final Future<SimpleHttpResponse> future = client.execute(httpGet, context, null);
                 try {
-                    final SimpleHttpResponse response = future.get(5, TimeUnit.SECONDS);
+                    final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
                     final int code = response.getCode();
                     if (code == HttpStatus.SC_OK) {
                         logResult(TestResult.OK, httpGet, "200");
@@ -186,6 +210,112 @@
                 }
             }
         }
+        // Wrong target auth scope
+        {
+            connManager.closeIdle(TimeValue.NEG_ONE_MILLISECONDS);
+            credentialsProvider.setCredentials(
+                    new AuthScope("otherhost", AuthScope.ANY_PORT, "Restricted Files"),
+                    new UsernamePasswordCredentials("testuser", "nopassword".toCharArray()));
+            final HttpClientContext context = HttpClientContext.create();
+            context.setCredentialsProvider(credentialsProvider);
+
+            final SimpleHttpRequest httpGetSecret = SimpleHttpRequest.get(target, "/private/big-secret.txt");
+            final Future<SimpleHttpResponse> future = client.execute(httpGetSecret, context, null);
+            try {
+                final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+                final int code = response.getCode();
+                if (code == HttpStatus.SC_UNAUTHORIZED) {
+                    logResult(TestResult.OK, httpGetSecret, "401 (wrong target auth scope)");
+                } else {
+                    logResult(TestResult.NOK, httpGetSecret, "(status " + code + ")");
+                }
+            } catch (ExecutionException ex) {
+                final Throwable cause = ex.getCause();
+                logResult(TestResult.NOK, httpGetSecret, "(" + cause.getMessage() + ")");
+            } catch (TimeoutException ex) {
+                logResult(TestResult.NOK, httpGetSecret, "(time out)");
+            }
+        }
+        // Wrong target credentials
+        {
+            connManager.closeIdle(TimeValue.NEG_ONE_MILLISECONDS);
+            credentialsProvider.setCredentials(
+                    new AuthScope(target),
+                    new UsernamePasswordCredentials("testuser", "wrong password".toCharArray()));
+            final HttpClientContext context = HttpClientContext.create();
+            context.setCredentialsProvider(credentialsProvider);
+
+            final SimpleHttpRequest httpGetSecret = SimpleHttpRequest.get(target, "/private/big-secret.txt");
+            final Future<SimpleHttpResponse> future = client.execute(httpGetSecret, context, null);
+            try {
+                final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+                final int code = response.getCode();
+                if (code == HttpStatus.SC_UNAUTHORIZED) {
+                    logResult(TestResult.OK, httpGetSecret, "401 (wrong target creds)");
+                } else {
+                    logResult(TestResult.NOK, httpGetSecret, "(status " + code + ")");
+                }
+            } catch (ExecutionException ex) {
+                final Throwable cause = ex.getCause();
+                logResult(TestResult.NOK, httpGetSecret, "(" + cause.getMessage() + ")");
+            } catch (TimeoutException ex) {
+                logResult(TestResult.NOK, httpGetSecret, "(time out)");
+            }
+        }
+        // Correct target credentials
+        {
+            connManager.closeIdle(TimeValue.NEG_ONE_MILLISECONDS);
+            credentialsProvider.setCredentials(
+                    new AuthScope(target),
+                    new UsernamePasswordCredentials("testuser", "nopassword".toCharArray()));
+            final HttpClientContext context = HttpClientContext.create();
+            context.setCredentialsProvider(credentialsProvider);
+
+            final SimpleHttpRequest httpGetSecret = SimpleHttpRequest.get(target, "/private/big-secret.txt");
+            final Future<SimpleHttpResponse> future = client.execute(httpGetSecret, context, null);
+            try {
+                final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+                final int code = response.getCode();
+                if (code == HttpStatus.SC_OK) {
+                    logResult(TestResult.OK, httpGetSecret, "200 (correct target creds)");
+                } else {
+                    logResult(TestResult.NOK, httpGetSecret, "(status " + code + ")");
+                }
+            } catch (ExecutionException ex) {
+                final Throwable cause = ex.getCause();
+                logResult(TestResult.NOK, httpGetSecret, "(" + cause.getMessage() + ")");
+            } catch (TimeoutException ex) {
+                logResult(TestResult.NOK, httpGetSecret, "(time out)");
+            }
+        }
+        // Correct target credentials (no keep-alive)
+        if (protocolVersion.lessEquals(HttpVersion.HTTP_1_1))
+        {
+            connManager.closeIdle(TimeValue.NEG_ONE_MILLISECONDS);
+            credentialsProvider.setCredentials(
+                    new AuthScope(target),
+                    new UsernamePasswordCredentials("testuser", "nopassword".toCharArray()));
+            final HttpClientContext context = HttpClientContext.create();
+            context.setCredentialsProvider(credentialsProvider);
+
+            final SimpleHttpRequest httpGetSecret = SimpleHttpRequest.get(target, "/private/big-secret.txt");
+            httpGetSecret.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
+            final Future<SimpleHttpResponse> future = client.execute(httpGetSecret, context, null);
+            try {
+                final SimpleHttpResponse response = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
+                final int code = response.getCode();
+                if (code == HttpStatus.SC_OK) {
+                    logResult(TestResult.OK, httpGetSecret, "200 (correct target creds / no keep-alive)");
+                } else {
+                    logResult(TestResult.NOK, httpGetSecret, "(status " + code + ")");
+                }
+            } catch (ExecutionException ex) {
+                final Throwable cause = ex.getCause();
+                logResult(TestResult.NOK, httpGetSecret, "(" + cause.getMessage() + ")");
+            } catch (TimeoutException ex) {
+                logResult(TestResult.NOK, httpGetSecret, "(time out)");
+            }
+        }
     }
 
 }
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java
new file mode 100644
index 0000000..c2dea36
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java
@@ -0,0 +1,394 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.client5.http.impl.async;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+import org.apache.hc.client5.http.HttpRoute;
+import org.apache.hc.client5.http.RouteTracker;
+import org.apache.hc.client5.http.async.AsyncExecCallback;
+import org.apache.hc.client5.http.async.AsyncExecChain;
+import org.apache.hc.client5.http.async.AsyncExecChainHandler;
+import org.apache.hc.client5.http.async.AsyncExecRuntime;
+import org.apache.hc.client5.http.auth.AuthExchange;
+import org.apache.hc.client5.http.auth.ChallengeType;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.auth.HttpAuthenticator;
+import org.apache.hc.client5.http.impl.routing.BasicRouteDirector;
+import org.apache.hc.client5.http.impl.sync.TunnelRefusedException;
+import org.apache.hc.client5.http.protocol.AuthenticationStrategy;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.client5.http.routing.HttpRouteDirector;
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.HttpVersion;
+import org.apache.hc.core5.http.message.BasicHttpRequest;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
+import org.apache.hc.core5.http.protocol.HttpProcessor;
+import org.apache.hc.core5.util.Args;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Request executor in the HTTP request execution chain
+ * that is responsible for establishing connection to the target
+ * origin server as specified by the current route.
+ *
+ * @since 5.0
+ */
+@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
+public final class AsyncConnectExec implements AsyncExecChainHandler {
+
+    private final Logger log = LogManager.getLogger(getClass());
+
+    private final HttpProcessor proxyHttpProcessor;
+    private final AuthenticationStrategy proxyAuthStrategy;
+    private final HttpAuthenticator authenticator;
+    private final HttpRouteDirector routeDirector;
+
+    public AsyncConnectExec(
+            final HttpProcessor proxyHttpProcessor,
+            final AuthenticationStrategy proxyAuthStrategy) {
+        Args.notNull(proxyHttpProcessor, "Proxy HTTP processor");
+        Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
+        this.proxyHttpProcessor = proxyHttpProcessor;
+        this.proxyAuthStrategy  = proxyAuthStrategy;
+        this.authenticator      = new HttpAuthenticator();
+        this.routeDirector      = new BasicRouteDirector();
+    }
+
+    static class State {
+
+        State(final HttpRoute route) {
+            tracker = new RouteTracker(route);
+        }
+
+        final RouteTracker tracker;
+
+        volatile boolean challenged;
+        volatile boolean tunnelRefused;
+
+    }
+
+    @Override
+    public void execute(
+            final HttpRequest request,
+            final AsyncEntityProducer entityProducer,
+            final AsyncExecChain.Scope scope,
+            final AsyncExecChain chain,
+            final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
+        Args.notNull(request, "HTTP request");
+        Args.notNull(scope, "Scope");
+
+        final String exchangeId = scope.exchangeId;
+        final HttpRoute route = scope.route;
+        final HttpClientContext clientContext = scope.clientContext;
+        final AsyncExecRuntime execRuntime = scope.execRuntime;
+        final State state = new State(route);
+
+        final Runnable routeInitiation = new Runnable() {
+
+            @Override
+            public void run() {
+                if (log.isDebugEnabled()) {
+                    log.debug(exchangeId + ": connection acquired");
+                }
+                if (execRuntime.isConnected()) {
+                    try {
+                        chain.proceed(request, entityProducer, scope, asyncExecCallback);
+                    } catch (final HttpException | IOException ex) {
+                        asyncExecCallback.failed(ex);
+                    }
+                } else {
+                    proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
+                }
+            }
+
+        };
+
+        if (!execRuntime.isConnectionAcquired()) {
+            final Object userToken = clientContext.getUserToken();
+            if (log.isDebugEnabled()) {
+                log.debug(exchangeId + ": acquiring connection with route " + route);
+            }
+            execRuntime.acquireConnection(route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
+
+                @Override
+                public void completed(final AsyncExecRuntime execRuntime) {
+                    routeInitiation.run();
+                }
+
+                @Override
+                public void failed(final Exception ex) {
+                    asyncExecCallback.failed(ex);
+                }
+
+                @Override
+                public void cancelled() {
+                    asyncExecCallback.failed(new InterruptedIOException());
+                }
+
+            });
+        } else {
+            routeInitiation.run();
+        }
+
+    }
+
+    private void proceedToNextHop(
+            final State state,
+            final HttpRequest request,
+            final AsyncEntityProducer entityProducer,
+            final AsyncExecChain.Scope scope,
+            final AsyncExecChain chain,
+            final AsyncExecCallback asyncExecCallback) {
+        final RouteTracker tracker = state.tracker;
+        final AsyncExecRuntime execRuntime = scope.execRuntime;
+        final HttpRoute route = scope.route;
+        final HttpClientContext clientContext = scope.clientContext;
+
+        int step;
+        do {
+            final HttpRoute fact = tracker.toRoute();
+            step = routeDirector.nextStep(route, fact);
+            switch (step) {
+                case HttpRouteDirector.CONNECT_TARGET:
+                    execRuntime.connect(clientContext, new FutureCallback<AsyncExecRuntime>() {
+
+                        @Override
+                        public void completed(final AsyncExecRuntime execRuntime) {
+                            tracker.connectTarget(route.isSecure());
+                            log.debug("Connected to target");
+                            proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
+                        }
+
+                        @Override
+                        public void failed(final Exception ex) {
+                            asyncExecCallback.failed(ex);
+                        }
+
+                        @Override
+                        public void cancelled() {
+                            asyncExecCallback.failed(new InterruptedIOException());
+                        }
+
+                    });
+                    return;
+
+                case HttpRouteDirector.CONNECT_PROXY:
+                    execRuntime.connect(clientContext, new FutureCallback<AsyncExecRuntime>() {
+
+                        @Override
+                        public void completed(final AsyncExecRuntime execRuntime) {
+                            final HttpHost proxy  = route.getProxyHost();
+                            tracker.connectProxy(proxy, false);
+                            log.debug("Connected to proxy");
+                            proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
+                        }
+
+                        @Override
+                        public void failed(final Exception ex) {
+                            asyncExecCallback.failed(ex);
+                        }
+
+                        @Override
+                        public void cancelled() {
+                            asyncExecCallback.failed(new InterruptedIOException());
+                        }
+
+                    });
+                    return;
+
+                case HttpRouteDirector.TUNNEL_TARGET:
+                    try {
+                        final HttpHost proxy = route.getProxyHost();
+                        final HttpHost target = route.getTargetHost();
+                        createTunnel(state, proxy ,target, scope, chain, new AsyncExecCallback() {
+
+                            @Override
+                            public AsyncDataConsumer handleResponse(
+                                    final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
+                                return asyncExecCallback.handleResponse(response, entityDetails);
+                            }
+
+                            @Override
+                            public void completed() {
+                                log.debug("Tunnel to target created");
+                                tracker.tunnelTarget(false);
+                                proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
+                            }
+
+                            @Override
+                            public void failed(final Exception cause) {
+                                asyncExecCallback.failed(cause);
+                            }
+
+                        });
+                    } catch (final HttpException | IOException ex) {
+                        asyncExecCallback.failed(ex);
+                    }
+                    return;
+
+                case HttpRouteDirector.TUNNEL_PROXY:
+                    // The most simple example for this case is a proxy chain
+                    // of two proxies, where P1 must be tunnelled to P2.
+                    // route: Source -> P1 -> P2 -> Target (3 hops)
+                    // fact:  Source -> P1 -> Target       (2 hops)
+                    asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
+                    return;
+
+                case HttpRouteDirector.LAYER_PROTOCOL:
+                    execRuntime.upgradeTls(clientContext);
+                    log.debug("Upgraded to TLS");
+                    tracker.layerProtocol(route.isSecure());
+                    break;
+
+                case HttpRouteDirector.UNREACHABLE:
+                    asyncExecCallback.failed(new HttpException("Unable to establish route: " +
+                            "planned = " + route + "; current = " + fact));
+                    return;
+
+                case HttpRouteDirector.COMPLETE:
+                    log.debug("Route fully established");
+                    try {
+                        chain.proceed(request, entityProducer, scope, asyncExecCallback);
+                    } catch (final HttpException | IOException ex) {
+                        asyncExecCallback.failed(ex);
+                    }
+                    break;
+
+                default:
+                    throw new IllegalStateException("Unknown step indicator "  + step + " from RouteDirector.");
+            }
+        } while (step > HttpRouteDirector.COMPLETE);
+    }
+
+    private void createTunnel(
+            final State state,
+            final HttpHost proxy,
+            final HttpHost nextHop,
+            final AsyncExecChain.Scope scope,
+            final AsyncExecChain chain,
+            final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
+
+        final AsyncExecRuntime execRuntime = scope.execRuntime;
+        final HttpClientContext clientContext = scope.clientContext;
+
+        final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange();
+
+        final HttpRequest connect = new BasicHttpRequest("CONNECT", nextHop, nextHop.toHostString());
+        connect.setVersion(HttpVersion.HTTP_1_1);
+
+        proxyHttpProcessor.process(connect, null, clientContext);
+        authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
+
+        chain.proceed(connect, null, scope, new AsyncExecCallback() {
+
+            @Override
+            public AsyncDataConsumer handleResponse(
+                    final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
+
+                clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
+                proxyHttpProcessor.process(response, entityDetails, clientContext);
+
+                final int status = response.getCode();
+                if (status < HttpStatus.SC_SUCCESS) {
+                    throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
+                }
+
+                if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
+                    state.challenged = true;
+                    return null;
+                } else {
+                    state.challenged = false;
+                    if (status >= HttpStatus.SC_REDIRECTION) {
+                        state.tunnelRefused = true;
+                        return asyncExecCallback.handleResponse(response, entityDetails);
+                    } else {
+                        return null;
+                    }
+                }
+            }
+
+            @Override
+            public void completed() {
+                if (!execRuntime.isConnected()) {
+                    state.tracker.reset();
+                }
+                if (state.challenged) {
+                    try {
+                        createTunnel(state, proxy, nextHop, scope, chain, asyncExecCallback);
+                    } catch (final HttpException | IOException ex) {
+                        asyncExecCallback.failed(ex);
+                    }
+                } else {
+                    if (state.tunnelRefused) {
+                        asyncExecCallback.failed(new TunnelRefusedException("Tunnel refused", null));
+                    } else {
+                        asyncExecCallback.completed();
+                    }
+                }
+            }
+
+            @Override
+            public void failed(final Exception cause) {
+                asyncExecCallback.failed(cause);
+            }
+
+        });
+
+    }
+
+    private boolean needAuthentication(
+            final AuthExchange proxyAuthExchange,
+            final HttpHost proxy,
+            final HttpResponse response,
+            final HttpClientContext context) {
+        final RequestConfig config = context.getRequestConfig();
+        if (config.isAuthenticationEnabled()) {
+            final boolean proxyAuthRequested = authenticator.isChallenged(proxy, ChallengeType.PROXY, response, proxyAuthExchange, context);
+            if (proxyAuthRequested) {
+                return authenticator.prepareAuthResponse(proxy, ChallengeType.PROXY, response,
+                        proxyAuthStrategy, proxyAuthExchange, context);
+            }
+        }
+        return false;
+    }
+
+}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncExecRuntimeImpl.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncExecRuntimeImpl.java
index b2a0785..8c90c35 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncExecRuntimeImpl.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncExecRuntimeImpl.java
@@ -227,7 +227,7 @@
     @Override
     public void upgradeTls(final HttpClientContext context) {
         final AsyncConnectionEndpoint endpoint = ensureValid();
-        manager.upgrade(endpoint, context);
+        manager.upgrade(endpoint, versionPolicy, context);
     }
 
     @Override
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncMainClientExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncMainClientExec.java
index ccd464e..f015d62 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncMainClientExec.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncMainClientExec.java
@@ -40,7 +40,6 @@
 import org.apache.hc.client5.http.async.AsyncExecRuntime;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
 import org.apache.hc.client5.http.protocol.UserTokenHandler;
-import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.http.EntityDetails;
 import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpException;
@@ -80,45 +79,6 @@
         final HttpRoute route = scope.route;
         final HttpClientContext clientContext = scope.clientContext;
         final AsyncExecRuntime execRuntime = scope.execRuntime;
-        if (!execRuntime.isConnectionAcquired()) {
-            final Object userToken = clientContext.getUserToken();
-            if (log.isDebugEnabled()) {
-                log.debug(exchangeId + ": acquiring connection with route " + route);
-            }
-            execRuntime.acquireConnection(route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
-
-                @Override
-                public void completed(final AsyncExecRuntime execRuntime) {
-                    if (log.isDebugEnabled()) {
-                        log.debug(exchangeId + ": connection acquired");
-                    }
-                    execute(exchangeId, route, execRuntime, request, entityProducer, clientContext, asyncExecCallback);
-                }
-
-                @Override
-                public void failed(final Exception ex) {
-                    asyncExecCallback.failed(ex);
-                }
-
-                @Override
-                public void cancelled() {
-                    asyncExecCallback.failed(new InterruptedIOException());
-                }
-
-            });
-        } else {
-            execute(exchangeId, route, execRuntime, request, entityProducer, clientContext, asyncExecCallback);
-        }
-    }
-
-    private void execute(
-            final String exchangeId,
-            final HttpRoute route,
-            final AsyncExecRuntime execRuntime,
-            final HttpRequest request,
-            final AsyncEntityProducer entityProducer,
-            final HttpClientContext clientContext,
-            final AsyncExecCallback asyncExecCallback) {
 
         if (log.isDebugEnabled()) {
             log.debug(exchangeId + ": executing " + new RequestLine(request));
@@ -186,6 +146,7 @@
                     } else {
                         execRuntime.validateConnection();
                     }
+                    asyncExecCallback.completed();
                 }
             }
 
@@ -227,6 +188,7 @@
             }
 
         };
+
         execRuntime.execute(
                 log.isDebugEnabled() ? new LoggingAsyncClientExchangeHandler(log, exchangeId, internalExchangeHandler) : internalExchangeHandler,
                 clientContext);
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncProtocolExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncProtocolExec.java
index fb3ba8f..bf20005 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncProtocolExec.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncProtocolExec.java
@@ -191,7 +191,7 @@
 
             @Override
             public void completed() {
-                if (execRuntime.isConnected()) {
+                if (!execRuntime.isConnected()) {
                     if (proxyAuthExchange.getState() == AuthExchange.State.SUCCESS
                             && proxyAuthExchange.getAuthScheme() != null
                             && proxyAuthExchange.getAuthScheme().isConnectionBased()) {
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java
index 07699a7..61351d7 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java
@@ -100,9 +100,11 @@
 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
 import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
+import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
 import org.apache.hc.core5.http.protocol.HttpContext;
 import org.apache.hc.core5.http.protocol.HttpProcessor;
 import org.apache.hc.core5.http.protocol.HttpProcessorBuilder;
+import org.apache.hc.core5.http.protocol.RequestTargetHost;
 import org.apache.hc.core5.http.protocol.RequestUserAgent;
 import org.apache.hc.core5.http2.HttpVersionPolicy;
 import org.apache.hc.core5.http2.config.H2Config;
@@ -735,6 +737,12 @@
             }
         }
 
+        execChainDefinition.addFirst(
+                new AsyncConnectExec(
+                        new DefaultHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)),
+                        proxyAuthStrategyCopy),
+                ChainElements.CONNECT.name());
+
         final HttpProcessorBuilder b = HttpProcessorBuilder.create();
         if (requestInterceptors != null) {
             for (final RequestInterceptorEntry entry: requestInterceptors) {
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/AsyncClientConnectionOperator.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/AsyncClientConnectionOperator.java
index 2a6b148..b2b764a 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/AsyncClientConnectionOperator.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/AsyncClientConnectionOperator.java
@@ -157,15 +157,15 @@
         return future;
     }
 
-    public void upgrade(final ManagedAsyncClientConnection connection, final HttpHost host) {
-        final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(host.getHostName()) : null;
+    public void upgrade(final ManagedAsyncClientConnection connection, final HttpHost host, final Object attachment) {
+        final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(host.getSchemeName()) : null;
         if (tlsStrategy != null) {
             tlsStrategy.upgrade(
                     connection,
                     host,
                     connection.getLocalAddress(),
                     connection.getRemoteAddress(),
-                    null);
+                    attachment);
         }
 
     }
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
index 328c625..e7039ea 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java
@@ -305,13 +305,14 @@
     @Override
     public void upgrade(
             final AsyncConnectionEndpoint endpoint,
+            final Object attachment,
             final HttpContext context) {
         Args.notNull(endpoint, "Managed endpoint");
         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
         final HttpRoute route = poolEntry.getRoute();
         final ManagedAsyncClientConnection connection = poolEntry.getConnection();
-        connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost());
+        connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost(), attachment);
         if (log.isDebugEnabled()) {
             log.debug(ConnPoolSupport.getId(internalEndpoint) + ": upgraded " + ConnPoolSupport.getId(connection));
         }
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ConnectExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ConnectExec.java
index af8daf7..9836ee2 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ConnectExec.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ConnectExec.java
@@ -54,7 +54,6 @@
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpStatus;
 import org.apache.hc.core5.http.HttpVersion;
-import org.apache.hc.core5.http.io.entity.BufferedHttpEntity;
 import org.apache.hc.core5.http.io.entity.EntityUtils;
 import org.apache.hc.core5.http.message.BasicClassicHttpRequest;
 import org.apache.hc.core5.http.message.StatusLine;
@@ -237,13 +236,9 @@
 
             // Buffer response content
             final HttpEntity entity = response.getEntity();
-            if (entity != null) {
-                response.setEntity(new BufferedHttpEntity(entity));
-            }
-
+            final String responseMessage = entity != null ? EntityUtils.toString(entity) : null;
             execRuntime.disconnect();
-            throw new TunnelRefusedException("CONNECT refused by proxy: " +
-                    new StatusLine(response), response);
+            throw new TunnelRefusedException("CONNECT refused by proxy: " + new StatusLine(response), responseMessage);
         }
 
         // How to decide on security of the tunnelled connection?
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ProxyClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ProxyClient.java
index 798d9ad..c7d2e63 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ProxyClient.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/ProxyClient.java
@@ -67,9 +67,9 @@
 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
 import org.apache.hc.core5.http.io.HttpConnectionFactory;
-import org.apache.hc.core5.http.io.entity.BufferedHttpEntity;
 import org.apache.hc.core5.http.io.entity.EntityUtils;
 import org.apache.hc.core5.http.message.BasicClassicHttpRequest;
+import org.apache.hc.core5.http.message.StatusLine;
 import org.apache.hc.core5.http.protocol.BasicHttpContext;
 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
 import org.apache.hc.core5.http.protocol.HttpContext;
@@ -207,12 +207,9 @@
 
             // Buffer response content
             final HttpEntity entity = response.getEntity();
-            if (entity != null) {
-                response.setEntity(new BufferedHttpEntity(entity));
-            }
-
+            final String responseMessage = entity != null ? EntityUtils.toString(entity) : null;
             conn.close();
-            throw new TunnelRefusedException("CONNECT refused by proxy: " + response, response);
+            throw new TunnelRefusedException("CONNECT refused by proxy: " + new StatusLine(response), responseMessage);
         }
         return conn.getSocket();
     }
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/TunnelRefusedException.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/TunnelRefusedException.java
index a187f76..fe78cd7 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/TunnelRefusedException.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/sync/TunnelRefusedException.java
@@ -27,7 +27,6 @@
 
 package org.apache.hc.client5.http.impl.sync;
 
-import org.apache.hc.core5.http.ClassicHttpResponse;
 import org.apache.hc.core5.http.HttpException;
 
 /**
@@ -39,15 +38,15 @@
 
     private static final long serialVersionUID = -8646722842745617323L;
 
-    private final ClassicHttpResponse response;
+    private final String responseMesage;
 
-    public TunnelRefusedException(final String message, final ClassicHttpResponse response) {
+    public TunnelRefusedException(final String message, final String responseMesage) {
         super(message);
-        this.response = response;
+        this.responseMesage = responseMesage;
     }
 
-    public ClassicHttpResponse getResponse() {
-        return this.response;
+    public String getResponseMessage() {
+        return this.responseMesage;
     }
 
 }
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionManager.java
index 077999e..72aa376 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionManager.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionManager.java
@@ -115,10 +115,12 @@
      * Upgrades the endpoint's underlying transport to Transport Layer Security.
      *
      * @param endpoint      the managed endpoint.
+     * @param attachment the attachment the upgrade attachment object.
      * @param context the actual HTTP context.
      */
     void upgrade(
             AsyncConnectionEndpoint endpoint,
+            Object attachment,
             HttpContext context);
 
 }
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/sync/TestConnectExec.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/sync/TestConnectExec.java
index 0d9817c..8410aa7 100644
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/sync/TestConnectExec.java
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/sync/TestConnectExec.java
@@ -54,7 +54,6 @@
 import org.apache.hc.core5.http.HttpRequest;
 import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hc.core5.http.HttpVersion;
-import org.apache.hc.core5.http.io.entity.EntityUtils;
 import org.apache.hc.core5.http.io.entity.StringEntity;
 import org.apache.hc.core5.http.message.BasicClassicHttpResponse;
 import org.apache.hc.core5.http.protocol.HttpProcessor;
@@ -219,8 +218,7 @@
         try {
             exec.execute(request, scope, execChain);
         } catch (final TunnelRefusedException ex) {
-            final ClassicHttpResponse r = ex.getResponse();
-            Assert.assertEquals("Ka-boom", EntityUtils.toString(r.getEntity()));
+            Assert.assertEquals("Ka-boom", ex.getResponseMessage());
             Mockito.verify(execRuntime).disconnect();
             Mockito.verify(execRuntime).discardConnection();
             throw ex;