blob: 6ebe71554f398ef91985637f2c81644eaa38fed4 [file] [log] [blame]
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.core5.http.impl.bootstrap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.concurrent.FutureContribution;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.impl.DefaultAddressResolver;
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.net.URIAuthority;
import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.pool.ManagedConnPool;
import org.apache.hc.core5.pool.PoolEntry;
import org.apache.hc.core5.pool.PoolStats;
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.EndpointParameters;
import org.apache.hc.core5.reactor.IOEventHandler;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.IOSessionListener;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
/**
* HTTP/1.1 client side message exchange initiator.
*
* @since 5.0
*/
public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
private final ManagedConnPool<HttpHost, IOSession> connPool;
/**
* Use {@link AsyncRequesterBootstrap} to create instances of this class.
*/
@Internal
public HttpAsyncRequester(
final IOReactorConfig ioReactorConfig,
final IOEventHandlerFactory eventHandlerFactory,
final Decorator<IOSession> ioSessionDecorator,
final Callback<Exception> exceptionCallback,
final IOSessionListener sessionListener,
final ManagedConnPool<HttpHost, IOSession> connPool) {
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
this.connPool = Args.notNull(connPool, "Connection pool");
}
@Override
public PoolStats getTotalStats() {
return connPool.getTotalStats();
}
@Override
public PoolStats getStats(final HttpHost route) {
return connPool.getStats(route);
}
@Override
public void setMaxTotal(final int max) {
connPool.setMaxTotal(max);
}
@Override
public int getMaxTotal() {
return connPool.getMaxTotal();
}
@Override
public void setDefaultMaxPerRoute(final int max) {
connPool.setDefaultMaxPerRoute(max);
}
@Override
public int getDefaultMaxPerRoute() {
return connPool.getDefaultMaxPerRoute();
}
@Override
public void setMaxPerRoute(final HttpHost route, final int max) {
connPool.setMaxPerRoute(route, max);
}
@Override
public int getMaxPerRoute(final HttpHost route) {
return connPool.getMaxPerRoute(route);
}
@Override
public void closeIdle(final TimeValue idleTime) {
connPool.closeIdle(idleTime);
}
@Override
public void closeExpired() {
connPool.closeExpired();
}
@Override
public Set<HttpHost> getRoutes() {
return connPool.getRoutes();
}
public Future<AsyncClientEndpoint> connect(
final HttpHost host,
final Timeout timeout,
final Object attachment,
final FutureCallback<AsyncClientEndpoint> callback) {
return doConnect(host, timeout, attachment, callback);
}
protected Future<AsyncClientEndpoint> doConnect(
final HttpHost host,
final Timeout timeout,
final Object attachment,
final FutureCallback<AsyncClientEndpoint> callback) {
Args.notNull(host, "Host");
Args.notNull(timeout, "Timeout");
final ComplexFuture<AsyncClientEndpoint> resultFuture = new ComplexFuture<>(callback);
final Future<PoolEntry<HttpHost, IOSession>> leaseFuture = connPool.lease(
host, null, timeout, new FutureCallback<PoolEntry<HttpHost, IOSession>>() {
@Override
public void completed(final PoolEntry<HttpHost, IOSession> poolEntry) {
final AsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(poolEntry);
final IOSession ioSession = poolEntry.getConnection();
if (ioSession != null && !ioSession.isOpen()) {
poolEntry.discardConnection(CloseMode.IMMEDIATE);
}
if (poolEntry.hasConnection()) {
resultFuture.completed(endpoint);
} else {
final Future<IOSession> future = requestSession(
host,
timeout,
new EndpointParameters(host, attachment),
new FutureCallback<IOSession>() {
@Override
public void completed(final IOSession session) {
session.setSocketTimeout(timeout);
poolEntry.assignConnection(session);
resultFuture.completed(endpoint);
}
@Override
public void failed(final Exception cause) {
try {
resultFuture.failed(cause);
} finally {
endpoint.releaseAndDiscard();
}
}
@Override
public void cancelled() {
try {
resultFuture.cancel();
} finally {
endpoint.releaseAndDiscard();
}
}
});
resultFuture.setDependency(future);
}
}
@Override
public void failed(final Exception ex) {
resultFuture.failed(ex);
}
@Override
public void cancelled() {
resultFuture.cancel();
}
});
resultFuture.setDependency(leaseFuture);
return resultFuture;
}
public Future<AsyncClientEndpoint> connect(final HttpHost host, final Timeout timeout) {
return connect(host, timeout, null, null);
}
public void execute(
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final Timeout timeout,
final HttpContext executeContext) {
Args.notNull(exchangeHandler, "Exchange handler");
Args.notNull(timeout, "Timeout");
Args.notNull(executeContext, "Context");
try {
exchangeHandler.produceRequest((request, entityDetails, requestContext) -> {
final String scheme = request.getScheme();
final URIAuthority authority = request.getAuthority();
if (authority == null) {
throw new ProtocolException("Request authority not specified");
}
final HttpHost target = new HttpHost(scheme, authority);
connect(target, timeout, null, new FutureCallback<AsyncClientEndpoint>() {
@Override
public void completed(final AsyncClientEndpoint endpoint) {
endpoint.execute(new AsyncClientExchangeHandler() {
@Override
public void releaseResources() {
endpoint.releaseAndDiscard();
exchangeHandler.releaseResources();
}
@Override
public void failed(final Exception cause) {
endpoint.releaseAndDiscard();
exchangeHandler.failed(cause);
}
@Override
public void cancel() {
endpoint.releaseAndDiscard();
exchangeHandler.cancel();
}
@Override
public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
channel.sendRequest(request, entityDetails, httpContext);
}
@Override
public int available() {
return exchangeHandler.available();
}
@Override
public void produce(final DataStreamChannel channel) throws IOException {
exchangeHandler.produce(channel);
}
@Override
public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
exchangeHandler.consumeInformation(response, httpContext);
}
@Override
public void consumeResponse(
final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
if (entityDetails == null) {
endpoint.releaseAndReuse();
}
exchangeHandler.consumeResponse(response, entityDetails, httpContext);
}
@Override
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
exchangeHandler.updateCapacity(capacityChannel);
}
@Override
public void consume(final ByteBuffer src) throws IOException {
exchangeHandler.consume(src);
}
@Override
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
endpoint.releaseAndReuse();
exchangeHandler.streamEnd(trailers);
}
}, pushHandlerFactory, executeContext);
}
@Override
public void failed(final Exception ex) {
exchangeHandler.failed(ex);
}
@Override
public void cancelled() {
exchangeHandler.cancel();
}
});
}, executeContext);
} catch (final IOException | HttpException ex) {
exchangeHandler.failed(ex);
}
}
public void execute(
final AsyncClientExchangeHandler exchangeHandler,
final Timeout timeout,
final HttpContext executeContext) {
execute(exchangeHandler, null, timeout, executeContext);
}
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final Timeout timeout,
final HttpContext context,
final FutureCallback<T> callback) {
Args.notNull(requestProducer, "Request producer");
Args.notNull(responseConsumer, "Response consumer");
Args.notNull(timeout, "Timeout");
final BasicFuture<T> future = new BasicFuture<>(callback);
final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(
requestProducer,
responseConsumer,
new FutureContribution<T>(future) {
@Override
public void completed(final T result) {
future.completed(result);
}
});
execute(exchangeHandler, pushHandlerFactory, timeout, context != null ? context : HttpCoreContext.create());
return future;
}
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final Timeout timeout,
final HttpContext context,
final FutureCallback<T> callback) {
return execute(requestProducer, responseConsumer, null, timeout, context, callback);
}
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final Timeout timeout,
final FutureCallback<T> callback) {
return execute(requestProducer, responseConsumer, null, timeout, null, callback);
}
private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
InternalAsyncClientEndpoint(final PoolEntry<HttpHost, IOSession> poolEntry) {
this.poolEntryRef = new AtomicReference<>(poolEntry);
}
private IOSession getIOSession() {
final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
if (poolEntry == null) {
throw new IllegalStateException("Endpoint has already been released");
}
final IOSession ioSession = poolEntry.getConnection();
if (ioSession == null) {
throw new IllegalStateException("I/O session is invalid");
}
return ioSession;
}
@Override
public void execute(
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
final IOSession ioSession = getIOSession();
ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
if (!ioSession.isOpen()) {
try {
exchangeHandler.failed(new ConnectionClosedException());
} finally {
exchangeHandler.releaseResources();
}
}
}
@Override
public boolean isConnected() {
final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
if (poolEntry != null) {
final IOSession ioSession = poolEntry.getConnection();
if (ioSession == null || !ioSession.isOpen()) {
return false;
}
final IOEventHandler handler = ioSession.getHandler();
return (handler instanceof HttpConnection) && ((HttpConnection) handler).isOpen();
}
return false;
}
@Override
public void releaseAndReuse() {
final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
if (poolEntry != null) {
final IOSession ioSession = poolEntry.getConnection();
connPool.release(poolEntry, ioSession != null && ioSession.isOpen());
}
}
@Override
public void releaseAndDiscard() {
final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
if (poolEntry != null) {
poolEntry.discardConnection(CloseMode.GRACEFUL);
connPool.release(poolEntry, false);
}
}
}
}