blob: 8eaa3cc4e115a38a264352bb9e9e688145036e49 [file] [log] [blame]
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.impl.nio.client;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.http.ConnectionClosedException;
import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpRequestWrapper;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.concurrent.BasicFuture;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.routing.RouteTracker;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler;
import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
import org.apache.http.util.Asserts;
/**
* Abstract {@link org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler} class
* that implements connection management aspects shared by all HTTP exchange handlers.
* <p>
* Instances of this class are expected to be accessed by one thread at a time only.
* The {@link #cancel()} method can be called concurrently by multiple threads.
*/
abstract class AbstractClientExchangeHandler<T> implements HttpAsyncClientExchangeHandler {
private static final AtomicLong COUNTER = new AtomicLong(1);
protected final Log log;
private final long id;
private final HttpClientContext localContext;
private final BasicFuture<T> resultFuture;
private final NHttpClientConnectionManager connmgr;
private final ConnectionReuseStrategy connReuseStrategy;
private final ConnectionKeepAliveStrategy keepaliveStrategy;
private final AtomicReference<NHttpClientConnection> managedConnRef;
private final AtomicReference<HttpRoute> routeRef;
private final AtomicReference<RouteTracker> routeTrackerRef;
private final AtomicBoolean routeEstablished;
private final AtomicReference<Long> validDurationRef;
private final AtomicReference<HttpRequestWrapper> requestRef;
private final AtomicReference<HttpResponse> responseRef;
private final AtomicBoolean completed;
private final AtomicBoolean closed;
AbstractClientExchangeHandler(
final Log log,
final HttpClientContext localContext,
final BasicFuture<T> resultFuture,
final NHttpClientConnectionManager connmgr,
final ConnectionReuseStrategy connReuseStrategy,
final ConnectionKeepAliveStrategy keepaliveStrategy) {
super();
this.log = log;
this.id = COUNTER.getAndIncrement();
this.localContext = localContext;
this.resultFuture = resultFuture;
this.connmgr = connmgr;
this.connReuseStrategy = connReuseStrategy;
this.keepaliveStrategy = keepaliveStrategy;
this.managedConnRef = new AtomicReference<NHttpClientConnection>(null);
this.routeRef = new AtomicReference<HttpRoute>(null);
this.routeTrackerRef = new AtomicReference<RouteTracker>(null);
this.routeEstablished = new AtomicBoolean(false);
this.validDurationRef = new AtomicReference<Long>(null);
this.requestRef = new AtomicReference<HttpRequestWrapper>(null);
this.responseRef = new AtomicReference<HttpResponse>(null);
this.completed = new AtomicBoolean(false);
this.closed = new AtomicBoolean(false);
}
final long getId() {
return this.id;
}
final boolean isCompleted() {
return this.completed.get();
}
final void markCompleted() {
this.completed.set(true);
}
final void markConnectionNonReusable() {
this.validDurationRef.set(null);
}
final boolean isRouteEstablished() {
return this.routeEstablished.get();
}
final HttpRoute getRoute() {
return this.routeRef.get();
}
final void setRoute(final HttpRoute route) {
this.routeRef.set(route);
}
final HttpRequestWrapper getCurrentRequest() {
return this.requestRef.get();
}
final void setCurrentRequest(final HttpRequestWrapper request) {
this.requestRef.set(request);
}
final HttpResponse getCurrentResponse() {
return this.responseRef.get();
}
final void setCurrentResponse(final HttpResponse response) {
this.responseRef.set(response);
}
final HttpRoute getActualRoute() {
final RouteTracker routeTracker = this.routeTrackerRef.get();
return routeTracker != null ? routeTracker.toRoute() : null;
}
final void verifytRoute() {
if (!this.routeEstablished.get() && this.routeTrackerRef.get() == null) {
final NHttpClientConnection managedConn = this.managedConnRef.get();
Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
final boolean routeComplete = this.connmgr.isRouteComplete(managedConn);
this.routeEstablished.set(routeComplete);
if (!routeComplete) {
this.log.debug("Start connection routing");
final HttpRoute route = this.routeRef.get();
this.routeTrackerRef.set(new RouteTracker(route));
} else {
this.log.debug("Connection route already established");
}
}
}
final void onRouteToTarget() throws IOException {
final NHttpClientConnection managedConn = this.managedConnRef.get();
Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
final HttpRoute route = this.routeRef.get();
Asserts.check(route != null, "Inconsistent state: HTTP route is null");
final RouteTracker routeTracker = this.routeTrackerRef.get();
Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
this.connmgr.startRoute(managedConn, route, this.localContext);
routeTracker.connectTarget(route.isSecure());
}
final void onRouteToProxy() throws IOException {
final NHttpClientConnection managedConn = this.managedConnRef.get();
Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
final HttpRoute route = this.routeRef.get();
Asserts.check(route != null, "Inconsistent state: HTTP route is null");
final RouteTracker routeTracker = this.routeTrackerRef.get();
Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
this.connmgr.startRoute(managedConn, route, this.localContext);
final HttpHost proxy = route.getProxyHost();
routeTracker.connectProxy(proxy, false);
}
final void onRouteUpgrade() throws IOException {
final NHttpClientConnection managedConn = this.managedConnRef.get();
Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
final HttpRoute route = this.routeRef.get();
Asserts.check(route != null, "Inconsistent state: HTTP route is null");
final RouteTracker routeTracker = this.routeTrackerRef.get();
Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
this.connmgr.upgrade(managedConn, route, this.localContext);
routeTracker.layerProtocol(route.isSecure());
}
final void onRouteTunnelToTarget() {
final RouteTracker routeTracker = this.routeTrackerRef.get();
Asserts.check(routeTracker != null, "Inconsistent state: HTTP route tracker");
routeTracker.tunnelTarget(false);
}
final void onRouteComplete() {
final NHttpClientConnection managedConn = this.managedConnRef.get();
Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
final HttpRoute route = this.routeRef.get();
Asserts.check(route != null, "Inconsistent state: HTTP route is null");
this.connmgr.routeComplete(managedConn, route, this.localContext);
this.routeEstablished.set(true);
this.routeTrackerRef.set(null);
}
final NHttpClientConnection getConnection() {
return this.managedConnRef.get();
}
final void releaseConnection() {
final NHttpClientConnection localConn = this.managedConnRef.getAndSet(null);
if (localConn != null) {
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + this.id + "] releasing connection");
}
localConn.getContext().removeAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER);
final Long validDuration = this.validDurationRef.get();
if (validDuration != null) {
final Object userToken = this.localContext.getUserToken();
this.connmgr.releaseConnection(localConn, userToken, validDuration, TimeUnit.MILLISECONDS);
} else {
try {
localConn.close();
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + this.id + "] connection discarded");
}
} catch (final IOException ex) {
if (this.log.isDebugEnabled()) {
this.log.debug(ex.getMessage(), ex);
}
} finally {
this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
}
}
}
}
final void discardConnection() {
final NHttpClientConnection localConn = this.managedConnRef.getAndSet(null);
if (localConn != null) {
try {
localConn.shutdown();
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + this.id + "] connection aborted");
}
} catch (final IOException ex) {
if (this.log.isDebugEnabled()) {
this.log.debug(ex.getMessage(), ex);
}
} finally {
this.connmgr.releaseConnection(localConn, null, 0, TimeUnit.MILLISECONDS);
}
}
}
final boolean manageConnectionPersistence() {
final HttpResponse response = this.responseRef.get();
Asserts.check(response != null, "Inconsistent state: HTTP response");
final NHttpClientConnection managedConn = this.managedConnRef.get();
Asserts.check(managedConn != null, "Inconsistent state: managed connection is null");
final boolean keepAlive = managedConn.isOpen() &&
this.connReuseStrategy.keepAlive(response, this.localContext);
if (keepAlive) {
final long validDuration = this.keepaliveStrategy.getKeepAliveDuration(
response, this.localContext);
if (this.log.isDebugEnabled()) {
final String s;
if (validDuration > 0) {
s = "for " + validDuration + " " + TimeUnit.MILLISECONDS;
} else {
s = "indefinitely";
}
this.log.debug("[exchange: " + this.id + "] Connection can be kept alive " + s);
}
this.validDurationRef.set(validDuration);
} else {
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + this.id + "] Connection cannot be kept alive");
}
this.validDurationRef.set(null);
}
return keepAlive;
}
private void connectionAllocated(final NHttpClientConnection managedConn) {
try {
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + this.id + "] Connection allocated: " + managedConn);
}
this.managedConnRef.set(managedConn);
if (this.closed.get()) {
discardConnection();
return;
}
managedConn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, this);
managedConn.requestOutput();
if (managedConn.isStale()) {
failed(new ConnectionClosedException("Connection closed"));
}
} catch (final RuntimeException runex) {
failed(runex);
throw runex;
}
}
private void connectionRequestFailed(final Exception ex) {
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + this.id + "] connection request failed");
}
failed(ex);
}
private void connectionRequestCancelled() {
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + this.id + "] Connection request cancelled");
}
try {
this.resultFuture.cancel();
} finally {
close();
}
}
final void requestConnection() {
final HttpRoute route = this.routeRef.get();
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + this.id + "] Request connection for " + route);
}
discardConnection();
this.validDurationRef.set(null);
this.routeTrackerRef.set(null);
this.routeEstablished.set(false);
final Object userToken = this.localContext.getUserToken();
final RequestConfig config = this.localContext.getRequestConfig();
this.connmgr.requestConnection(
route,
userToken,
config.getConnectTimeout(),
config.getConnectionRequestTimeout(),
TimeUnit.MILLISECONDS,
new FutureCallback<NHttpClientConnection>() {
@Override
public void completed(final NHttpClientConnection managedConn) {
connectionAllocated(managedConn);
}
@Override
public void failed(final Exception ex) {
connectionRequestFailed(ex);
}
@Override
public void cancelled() {
connectionRequestCancelled();
}
});
}
abstract void releaseResources();
abstract void executionFailed(final Exception ex);
abstract boolean executionCancelled();
@Override
public final void close() {
if (this.closed.compareAndSet(false, true)) {
discardConnection();
releaseResources();
}
}
@Override
public final boolean isDone() {
return this.completed.get();
}
@Override
public final void failed(final Exception ex) {
if (this.closed.compareAndSet(false, true)) {
try {
try {
executionFailed(ex);
} finally {
discardConnection();
releaseResources();
}
} finally {
this.resultFuture.failed(ex);
}
}
}
@Override
public final boolean cancel() {
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + this.id + "] Cancelled");
}
if (this.closed.compareAndSet(false, true)) {
try {
try {
return executionCancelled();
} finally {
discardConnection();
releaseResources();
}
} finally {
this.resultFuture.cancel();
}
}
return false;
}
}