| /* |
| * ==================================================================== |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * ==================================================================== |
| * |
| * This software consists of voluntary contributions made by many |
| * individuals on behalf of the Apache Software Foundation. For more |
| * information on the Apache Software Foundation, please see |
| * <http://www.apache.org/>. |
| * |
| */ |
| package org.apache.hc.core5.http.examples; |
| |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.net.InetSocketAddress; |
| import java.nio.ByteBuffer; |
| import java.nio.CharBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.hc.core5.concurrent.FutureCallback; |
| import org.apache.hc.core5.http.ConnectionClosedException; |
| import org.apache.hc.core5.http.ContentType; |
| import org.apache.hc.core5.http.EntityDetails; |
| import org.apache.hc.core5.http.Header; |
| import org.apache.hc.core5.http.HeaderElements; |
| import org.apache.hc.core5.http.HttpConnection; |
| import org.apache.hc.core5.http.HttpException; |
| import org.apache.hc.core5.http.HttpHeaders; |
| import org.apache.hc.core5.http.HttpHost; |
| import org.apache.hc.core5.http.HttpRequest; |
| import org.apache.hc.core5.http.HttpResponse; |
| import org.apache.hc.core5.http.HttpStatus; |
| import org.apache.hc.core5.http.URIScheme; |
| import org.apache.hc.core5.http.impl.BasicEntityDetails; |
| import org.apache.hc.core5.http.impl.Http1StreamListener; |
| import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap; |
| import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap; |
| import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; |
| import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; |
| import org.apache.hc.core5.http.impl.nio.BufferedData; |
| import org.apache.hc.core5.http.message.BasicHttpRequest; |
| import org.apache.hc.core5.http.message.BasicHttpResponse; |
| import org.apache.hc.core5.http.nio.AsyncClientEndpoint; |
| import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; |
| import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; |
| import org.apache.hc.core5.http.nio.CapacityChannel; |
| import org.apache.hc.core5.http.nio.DataStreamChannel; |
| import org.apache.hc.core5.http.nio.RequestChannel; |
| import org.apache.hc.core5.http.nio.ResponseChannel; |
| import org.apache.hc.core5.http.protocol.HttpContext; |
| import org.apache.hc.core5.http.protocol.HttpCoreContext; |
| import org.apache.hc.core5.http.protocol.HttpDateGenerator; |
| import org.apache.hc.core5.io.CloseMode; |
| import org.apache.hc.core5.pool.ConnPoolListener; |
| import org.apache.hc.core5.pool.ConnPoolStats; |
| import org.apache.hc.core5.pool.PoolStats; |
| import org.apache.hc.core5.reactor.IOReactorConfig; |
| import org.apache.hc.core5.util.TimeValue; |
| import org.apache.hc.core5.util.Timeout; |
| |
| /** |
| * Example of asynchronous embedded HTTP/1.1 reverse proxy with full content streaming. |
| */ |
| public class AsyncReverseProxyExample { |
| |
| private static boolean quiet; |
| |
| public static void main(final String[] args) throws Exception { |
| if (args.length < 1) { |
| System.out.println("Usage: <hostname[:port]> [listener port] [--quiet]"); |
| System.exit(1); |
| } |
| // Target host |
| final HttpHost targetHost = HttpHost.create(args[0]); |
| int port = 8080; |
| if (args.length > 1) { |
| port = Integer.parseInt(args[1]); |
| } |
| for (final String s : args) { |
| if ("--quiet".equalsIgnoreCase(s)) { |
| quiet = true; |
| break; |
| } |
| } |
| |
| println("Reverse proxy to " + targetHost); |
| |
| final IOReactorConfig config = IOReactorConfig.custom() |
| .setSoTimeout(1, TimeUnit.MINUTES) |
| .build(); |
| |
| final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap() |
| .setIOReactorConfig(config) |
| .setConnPoolListener(new ConnPoolListener<HttpHost>() { |
| |
| @Override |
| public void onLease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) { |
| final StringBuilder buf = new StringBuilder(); |
| buf.append("[proxy->origin] connection leased ").append(route); |
| println(buf.toString()); |
| } |
| |
| @Override |
| public void onRelease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) { |
| final StringBuilder buf = new StringBuilder(); |
| buf.append("[proxy->origin] connection released ").append(route); |
| final PoolStats totals = connPoolStats.getTotalStats(); |
| buf.append("; total kept alive: ").append(totals.getAvailable()).append("; "); |
| buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable()); |
| buf.append(" of ").append(totals.getMax()); |
| println(buf.toString()); |
| } |
| |
| }) |
| .setStreamListener(new Http1StreamListener() { |
| |
| @Override |
| public void onRequestHead(final HttpConnection connection, final HttpRequest request) { |
| // empty |
| } |
| |
| @Override |
| public void onResponseHead(final HttpConnection connection, final HttpResponse response) { |
| // empty |
| } |
| |
| @Override |
| public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) { |
| println("[proxy<-origin] connection " + |
| connection.getLocalAddress() + "->" + connection.getRemoteAddress() + |
| (keepAlive ? " kept alive" : " cannot be kept alive")); |
| } |
| |
| }) |
| .setMaxTotal(100) |
| .setDefaultMaxPerRoute(20) |
| .create(); |
| |
| final HttpAsyncServer server = AsyncServerBootstrap.bootstrap() |
| .setIOReactorConfig(config) |
| .setStreamListener(new Http1StreamListener() { |
| |
| @Override |
| public void onRequestHead(final HttpConnection connection, final HttpRequest request) { |
| // empty |
| } |
| |
| @Override |
| public void onResponseHead(final HttpConnection connection, final HttpResponse response) { |
| // empty |
| } |
| |
| @Override |
| public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) { |
| println("[client<-proxy] connection " + |
| connection.getLocalAddress() + "->" + connection.getRemoteAddress() + |
| (keepAlive ? " kept alive" : " cannot be kept alive")); |
| } |
| |
| }) |
| .register("*", () -> new IncomingExchangeHandler(targetHost, requester)) |
| .create(); |
| |
| Runtime.getRuntime().addShutdownHook(new Thread(() -> { |
| println("Reverse proxy shutting down"); |
| server.close(CloseMode.GRACEFUL); |
| requester.close(CloseMode.GRACEFUL); |
| })); |
| |
| requester.start(); |
| server.start(); |
| server.listen(new InetSocketAddress(port), URIScheme.HTTP); |
| println("Listening on port " + port); |
| |
| server.awaitShutdown(TimeValue.MAX_VALUE); |
| } |
| |
| private static class ProxyBuffer extends BufferedData { |
| |
| ProxyBuffer(final int bufferSize) { |
| super(bufferSize); |
| } |
| |
| int write(final DataStreamChannel channel) throws IOException { |
| setOutputMode(); |
| if (buffer().hasRemaining()) { |
| return channel.write(buffer()); |
| } |
| return 0; |
| } |
| |
| } |
| |
| private static final AtomicLong COUNT = new AtomicLong(0); |
| |
| private static class ProxyExchangeState { |
| |
| final String id; |
| |
| HttpRequest request; |
| EntityDetails requestEntityDetails; |
| DataStreamChannel requestDataChannel; |
| CapacityChannel requestCapacityChannel; |
| ProxyBuffer inBuf; |
| boolean inputEnd; |
| |
| HttpResponse response; |
| EntityDetails responseEntityDetails; |
| ResponseChannel responseMessageChannel; |
| DataStreamChannel responseDataChannel; |
| CapacityChannel responseCapacityChannel; |
| ProxyBuffer outBuf; |
| boolean outputEnd; |
| |
| AsyncClientEndpoint clientEndpoint; |
| |
| ProxyExchangeState() { |
| this.id = String.format("%010d", COUNT.getAndIncrement()); |
| } |
| |
| } |
| |
| private static final int INIT_BUFFER_SIZE = 4096; |
| |
| private static class IncomingExchangeHandler implements AsyncServerExchangeHandler { |
| |
| private final HttpHost targetHost; |
| private final HttpAsyncRequester requester; |
| private final ProxyExchangeState exchangeState; |
| |
| IncomingExchangeHandler(final HttpHost targetHost, final HttpAsyncRequester requester) { |
| super(); |
| this.targetHost = targetHost; |
| this.requester = requester; |
| this.exchangeState = new ProxyExchangeState(); |
| } |
| |
| @Override |
| public void handleRequest( |
| final HttpRequest incomingRequest, |
| final EntityDetails entityDetails, |
| final ResponseChannel responseChannel, |
| final HttpContext httpContext) throws HttpException, IOException { |
| |
| synchronized (exchangeState) { |
| println("[client->proxy] " + exchangeState.id + " " + |
| incomingRequest.getMethod() + " " + incomingRequest.getRequestUri()); |
| exchangeState.request = incomingRequest; |
| exchangeState.requestEntityDetails = entityDetails; |
| exchangeState.inputEnd = entityDetails == null; |
| exchangeState.responseMessageChannel = responseChannel; |
| |
| if (entityDetails != null) { |
| final Header h = incomingRequest.getFirstHeader(HttpHeaders.EXPECT); |
| if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) { |
| responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), httpContext); |
| } |
| } |
| } |
| |
| println("[proxy->origin] " + exchangeState.id + " request connection to " + targetHost); |
| |
| requester.connect(targetHost, Timeout.ofSeconds(30), null, new FutureCallback<AsyncClientEndpoint>() { |
| |
| @Override |
| public void completed(final AsyncClientEndpoint clientEndpoint) { |
| println("[proxy->origin] " + exchangeState.id + " connection leased"); |
| synchronized (exchangeState) { |
| exchangeState.clientEndpoint = clientEndpoint; |
| } |
| clientEndpoint.execute( |
| new OutgoingExchangeHandler(targetHost, clientEndpoint, exchangeState), |
| HttpCoreContext.create()); |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| final HttpResponse outgoingResponse = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE); |
| outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE); |
| final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage())); |
| final EntityDetails exEntityDetails = new BasicEntityDetails(msg.remaining(), |
| ContentType.TEXT_PLAIN); |
| synchronized (exchangeState) { |
| exchangeState.response = outgoingResponse; |
| exchangeState.responseEntityDetails = exEntityDetails; |
| exchangeState.outBuf = new ProxyBuffer(1024); |
| exchangeState.outBuf.put(msg); |
| exchangeState.outputEnd = true; |
| } |
| println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode()); |
| |
| try { |
| responseChannel.sendResponse(outgoingResponse, exEntityDetails, httpContext); |
| } catch (final HttpException | IOException ignore) { |
| // ignore |
| } |
| } |
| |
| @Override |
| public void cancelled() { |
| failed(new InterruptedIOException()); |
| } |
| |
| }); |
| |
| } |
| |
| @Override |
| public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { |
| synchronized (exchangeState) { |
| exchangeState.requestCapacityChannel = capacityChannel; |
| final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE; |
| if (capacity > 0) { |
| println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity); |
| capacityChannel.update(capacity); |
| } |
| } |
| } |
| |
| @Override |
| public void consume(final ByteBuffer src) throws IOException { |
| synchronized (exchangeState) { |
| println("[client->proxy] " + exchangeState.id + " " + src.remaining() + " bytes received"); |
| final DataStreamChannel dataChannel = exchangeState.requestDataChannel; |
| if (dataChannel != null && exchangeState.inBuf != null) { |
| if (exchangeState.inBuf.hasData()) { |
| final int bytesWritten = exchangeState.inBuf.write(dataChannel); |
| println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent"); |
| } |
| if (!exchangeState.inBuf.hasData()) { |
| final int bytesWritten = dataChannel.write(src); |
| println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent"); |
| } |
| } |
| if (src.hasRemaining()) { |
| if (exchangeState.inBuf == null) { |
| exchangeState.inBuf = new ProxyBuffer(INIT_BUFFER_SIZE); |
| } |
| exchangeState.inBuf.put(src); |
| } |
| final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE; |
| println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity); |
| if (dataChannel != null) { |
| dataChannel.requestOutput(); |
| } |
| } |
| } |
| |
| @Override |
| public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException { |
| synchronized (exchangeState) { |
| println("[client->proxy] " + exchangeState.id + " end of input"); |
| exchangeState.inputEnd = true; |
| final DataStreamChannel dataChannel = exchangeState.requestDataChannel; |
| if (dataChannel != null && (exchangeState.inBuf == null || !exchangeState.inBuf.hasData())) { |
| println("[proxy->origin] " + exchangeState.id + " end of output"); |
| dataChannel.endStream(); |
| } |
| } |
| } |
| |
| @Override |
| public int available() { |
| synchronized (exchangeState) { |
| final int available = exchangeState.outBuf != null ? exchangeState.outBuf.length() : 0; |
| println("[client<-proxy] " + exchangeState.id + " output available: " + available); |
| return available; |
| } |
| } |
| |
| @Override |
| public void produce(final DataStreamChannel channel) throws IOException { |
| synchronized (exchangeState) { |
| println("[client<-proxy] " + exchangeState.id + " produce output"); |
| exchangeState.responseDataChannel = channel; |
| |
| if (exchangeState.outBuf != null) { |
| if (exchangeState.outBuf.hasData()) { |
| final int bytesWritten = exchangeState.outBuf.write(channel); |
| println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent"); |
| } |
| if (exchangeState.outputEnd && !exchangeState.outBuf.hasData()) { |
| channel.endStream(); |
| println("[client<-proxy] " + exchangeState.id + " end of output"); |
| } |
| if (!exchangeState.outputEnd) { |
| final CapacityChannel capacityChannel = exchangeState.responseCapacityChannel; |
| if (capacityChannel != null) { |
| final int capacity = exchangeState.outBuf.capacity(); |
| if (capacity > 0) { |
| println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity); |
| capacityChannel.update(capacity); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage()); |
| if (!(cause instanceof ConnectionClosedException)) { |
| cause.printStackTrace(System.out); |
| } |
| synchronized (exchangeState) { |
| if (exchangeState.clientEndpoint != null) { |
| exchangeState.clientEndpoint.releaseAndDiscard(); |
| } |
| } |
| } |
| |
| @Override |
| public void releaseResources() { |
| synchronized (exchangeState) { |
| exchangeState.responseMessageChannel = null; |
| exchangeState.responseDataChannel = null; |
| exchangeState.requestCapacityChannel = null; |
| } |
| } |
| |
| } |
| |
| private static class OutgoingExchangeHandler implements AsyncClientExchangeHandler { |
| |
| private final static Set<String> HOP_BY_HOP = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( |
| HttpHeaders.HOST.toLowerCase(Locale.ROOT), |
| HttpHeaders.CONTENT_LENGTH.toLowerCase(Locale.ROOT), |
| HttpHeaders.TRANSFER_ENCODING.toLowerCase(Locale.ROOT), |
| HttpHeaders.CONNECTION.toLowerCase(Locale.ROOT), |
| HttpHeaders.KEEP_ALIVE.toLowerCase(Locale.ROOT), |
| HttpHeaders.PROXY_AUTHENTICATE.toLowerCase(Locale.ROOT), |
| HttpHeaders.TE.toLowerCase(Locale.ROOT), |
| HttpHeaders.TRAILER.toLowerCase(Locale.ROOT), |
| HttpHeaders.UPGRADE.toLowerCase(Locale.ROOT)))); |
| |
| private final HttpHost targetHost; |
| private final AsyncClientEndpoint clientEndpoint; |
| private final ProxyExchangeState exchangeState; |
| |
| OutgoingExchangeHandler( |
| final HttpHost targetHost, |
| final AsyncClientEndpoint clientEndpoint, |
| final ProxyExchangeState exchangeState) { |
| this.targetHost = targetHost; |
| this.clientEndpoint = clientEndpoint; |
| this.exchangeState = exchangeState; |
| } |
| |
| @Override |
| public void produceRequest( |
| final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException { |
| synchronized (exchangeState) { |
| final HttpRequest incomingRequest = exchangeState.request; |
| final EntityDetails entityDetails = exchangeState.requestEntityDetails; |
| final HttpRequest outgoingRequest = new BasicHttpRequest( |
| incomingRequest.getMethod(), |
| targetHost, |
| incomingRequest.getPath()); |
| for (final Iterator<Header> it = incomingRequest.headerIterator(); it.hasNext(); ) { |
| final Header header = it.next(); |
| if (!HOP_BY_HOP.contains(header.getName().toLowerCase(Locale.ROOT))) { |
| outgoingRequest.addHeader(header); |
| } |
| } |
| |
| println("[proxy->origin] " + exchangeState.id + " " + |
| outgoingRequest.getMethod() + " " + outgoingRequest.getRequestUri()); |
| |
| channel.sendRequest(outgoingRequest, entityDetails, httpContext); |
| } |
| } |
| |
| @Override |
| public int available() { |
| synchronized (exchangeState) { |
| final int available = exchangeState.inBuf != null ? exchangeState.inBuf.length() : 0; |
| println("[proxy->origin] " + exchangeState.id + " output available: " + available); |
| return available; |
| } |
| } |
| |
| @Override |
| public void produce(final DataStreamChannel channel) throws IOException { |
| synchronized (exchangeState) { |
| println("[proxy->origin] " + exchangeState.id + " produce output"); |
| exchangeState.requestDataChannel = channel; |
| if (exchangeState.inBuf != null) { |
| if (exchangeState.inBuf.hasData()) { |
| final int bytesWritten = exchangeState.inBuf.write(channel); |
| println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent"); |
| } |
| if (exchangeState.inputEnd && !exchangeState.inBuf.hasData()) { |
| channel.endStream(); |
| println("[proxy->origin] " + exchangeState.id + " end of output"); |
| } |
| if (!exchangeState.inputEnd) { |
| final CapacityChannel capacityChannel = exchangeState.requestCapacityChannel; |
| if (capacityChannel != null) { |
| final int capacity = exchangeState.inBuf.capacity(); |
| if (capacity > 0) { |
| println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity); |
| capacityChannel.update(capacity); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { |
| // ignore |
| } |
| |
| @Override |
| public void consumeResponse( |
| final HttpResponse incomingResponse, |
| final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException { |
| synchronized (exchangeState) { |
| println("[proxy<-origin] " + exchangeState.id + " status " + incomingResponse.getCode()); |
| if (entityDetails == null) { |
| println("[proxy<-origin] " + exchangeState.id + " end of input"); |
| } |
| |
| final HttpResponse outgoingResponse = new BasicHttpResponse(incomingResponse.getCode()); |
| for (final Iterator<Header> it = incomingResponse.headerIterator(); it.hasNext(); ) { |
| final Header header = it.next(); |
| if (!HOP_BY_HOP.contains(header.getName().toLowerCase(Locale.ROOT))) { |
| outgoingResponse.addHeader(header); |
| } |
| } |
| |
| exchangeState.response = outgoingResponse; |
| exchangeState.responseEntityDetails = entityDetails; |
| exchangeState.outputEnd = entityDetails == null; |
| |
| final ResponseChannel responseChannel = exchangeState.responseMessageChannel; |
| if (responseChannel != null) { |
| // responseChannel can be null under load. |
| responseChannel.sendResponse(outgoingResponse, entityDetails, httpContext); |
| } |
| |
| println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode()); |
| if (entityDetails == null) { |
| println("[client<-proxy] " + exchangeState.id + " end of output"); |
| clientEndpoint.releaseAndReuse(); |
| } |
| } |
| } |
| |
| @Override |
| public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { |
| synchronized (exchangeState) { |
| exchangeState.responseCapacityChannel = capacityChannel; |
| final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE; |
| if (capacity > 0) { |
| println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity); |
| capacityChannel.update(capacity); |
| } |
| } |
| } |
| |
| @Override |
| public void consume(final ByteBuffer src) throws IOException { |
| synchronized (exchangeState) { |
| println("[proxy<-origin] " + exchangeState.id + " " + src.remaining() + " bytes received"); |
| final DataStreamChannel dataChannel = exchangeState.responseDataChannel; |
| if (dataChannel != null && exchangeState.outBuf != null) { |
| if (exchangeState.outBuf.hasData()) { |
| final int bytesWritten = exchangeState.outBuf.write(dataChannel); |
| println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent"); |
| } |
| if (!exchangeState.outBuf.hasData()) { |
| final int bytesWritten = dataChannel.write(src); |
| println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent"); |
| } |
| } |
| if (src.hasRemaining()) { |
| if (exchangeState.outBuf == null) { |
| exchangeState.outBuf = new ProxyBuffer(INIT_BUFFER_SIZE); |
| } |
| exchangeState.outBuf.put(src); |
| } |
| final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE; |
| println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity); |
| if (dataChannel != null) { |
| dataChannel.requestOutput(); |
| } |
| } |
| } |
| |
| @Override |
| public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException { |
| synchronized (exchangeState) { |
| println("[proxy<-origin] " + exchangeState.id + " end of input"); |
| exchangeState.outputEnd = true; |
| final DataStreamChannel dataChannel = exchangeState.responseDataChannel; |
| if (dataChannel != null && (exchangeState.outBuf == null || !exchangeState.outBuf.hasData())) { |
| println("[client<-proxy] " + exchangeState.id + " end of output"); |
| dataChannel.endStream(); |
| clientEndpoint.releaseAndReuse(); |
| } |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| clientEndpoint.releaseAndDiscard(); |
| } |
| |
| @Override |
| public void failed(final Exception cause) { |
| println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage()); |
| if (!(cause instanceof ConnectionClosedException)) { |
| cause.printStackTrace(System.out); |
| } |
| synchronized (exchangeState) { |
| if (exchangeState.response == null) { |
| final int status = cause instanceof IOException ? HttpStatus.SC_SERVICE_UNAVAILABLE : HttpStatus.SC_INTERNAL_SERVER_ERROR; |
| final HttpResponse outgoingResponse = new BasicHttpResponse(status); |
| outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE); |
| exchangeState.response = outgoingResponse; |
| |
| final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage())); |
| final int contentLen = msg.remaining(); |
| exchangeState.outBuf = new ProxyBuffer(1024); |
| exchangeState.outBuf.put(msg); |
| exchangeState.outputEnd = true; |
| |
| println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode()); |
| |
| try { |
| final EntityDetails entityDetails = new BasicEntityDetails(contentLen, ContentType.TEXT_PLAIN); |
| exchangeState.responseMessageChannel.sendResponse(outgoingResponse, entityDetails, null); |
| } catch (final HttpException | IOException ignore) { |
| // ignore |
| } |
| } else { |
| exchangeState.outputEnd = true; |
| } |
| clientEndpoint.releaseAndDiscard(); |
| } |
| } |
| |
| @Override |
| public void releaseResources() { |
| synchronized (exchangeState) { |
| exchangeState.requestDataChannel = null; |
| exchangeState.responseCapacityChannel = null; |
| clientEndpoint.releaseAndDiscard(); |
| } |
| } |
| |
| } |
| |
| static void println(final String msg) { |
| if (!quiet) { |
| System.out.println(HttpDateGenerator.INSTANCE.getCurrentDate() + " | " + msg); |
| } |
| } |
| } |