blob: 35cf169c348534966bb60a8bd2ff39ecc5f8e6e1 [file] [log] [blame]
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.core5.http.impl.nio;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.WritableByteChannel;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLSession;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.ContentLengthStrategy;
import org.apache.hc.core5.http.EndpointDetails;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpMessage;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.config.Http1Config;
import org.apache.hc.core5.http.impl.BasicEndpointDetails;
import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
import org.apache.hc.core5.http.impl.CharCodingSupport;
import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
import org.apache.hc.core5.http.impl.IncomingEntityDetails;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.ContentDecoder;
import org.apache.hc.core5.http.nio.ContentEncoder;
import org.apache.hc.core5.http.nio.NHttpMessageParser;
import org.apache.hc.core5.http.nio.NHttpMessageWriter;
import org.apache.hc.core5.http.nio.SessionInputBuffer;
import org.apache.hc.core5.http.nio.SessionOutputBuffer;
import org.apache.hc.core5.http.nio.command.CommandSupport;
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.EventMask;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Identifiable;
import org.apache.hc.core5.util.Timeout;
abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
implements Identifiable, HttpConnection {
private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
private final ProtocolIOSession ioSession;
private final Http1Config http1Config;
private final SessionInputBufferImpl inbuf;
private final SessionOutputBufferImpl outbuf;
private final BasicHttpTransportMetrics inTransportMetrics;
private final BasicHttpTransportMetrics outTransportMetrics;
private final BasicHttpConnectionMetrics connMetrics;
private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
private final ContentLengthStrategy incomingContentStrategy;
private final ContentLengthStrategy outgoingContentStrategy;
private final ByteBuffer contentBuffer;
private final AtomicInteger outputRequests;
private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
private volatile ConnectionState connState;
private volatile CapacityWindow capacityWindow;
private volatile ProtocolVersion version;
private volatile EndpointDetails endpointDetails;
AbstractHttp1StreamDuplexer(
final ProtocolIOSession ioSession,
final Http1Config http1Config,
final CharCodingConfig charCodingConfig,
final NHttpMessageParser<IncomingMessage> incomingMessageParser,
final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
final ContentLengthStrategy incomingContentStrategy,
final ContentLengthStrategy outgoingContentStrategy) {
this.ioSession = Args.notNull(ioSession, "I/O session");
this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
final int bufferSize = this.http1Config.getBufferSize();
this.inbuf = new SessionInputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
this.http1Config.getMaxLineLength(),
CharCodingSupport.createDecoder(charCodingConfig));
this.outbuf = new SessionOutputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
CharCodingSupport.createEncoder(charCodingConfig));
this.inTransportMetrics = new BasicHttpTransportMetrics();
this.outTransportMetrics = new BasicHttpTransportMetrics();
this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
this.incomingMessageParser = incomingMessageParser;
this.outgoingMessageWriter = outgoingMessageWriter;
this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
DefaultContentLengthStrategy.INSTANCE;
this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
DefaultContentLengthStrategy.INSTANCE;
this.contentBuffer = ByteBuffer.allocate(this.http1Config.getBufferSize());
this.outputRequests = new AtomicInteger(0);
this.connState = ConnectionState.READY;
}
@Override
public String getId() {
return ioSession.getId();
}
void shutdownSession(final CloseMode closeMode) {
if (closeMode == CloseMode.GRACEFUL) {
connState = ConnectionState.GRACEFUL_SHUTDOWN;
ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
} else {
connState = ConnectionState.SHUTDOWN;
ioSession.close();
}
}
void shutdownSession(final Exception cause) {
connState = ConnectionState.SHUTDOWN;
try {
terminate(cause);
} finally {
final CloseMode closeMode;
if (cause instanceof ConnectionClosedException) {
closeMode = CloseMode.GRACEFUL;
} else if (cause instanceof IOException) {
closeMode = CloseMode.IMMEDIATE;
} else {
closeMode = CloseMode.GRACEFUL;
}
ioSession.close(closeMode);
}
}
abstract void disconnected();
abstract void terminate(final Exception exception);
abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
abstract ContentDecoder createContentDecoder(
long contentLength,
ReadableByteChannel channel,
SessionInputBuffer buffer,
BasicHttpTransportMetrics metrics) throws HttpException;
abstract ContentEncoder createContentEncoder(
long contentLength,
WritableByteChannel channel,
SessionOutputBuffer buffer,
BasicHttpTransportMetrics metrics) throws HttpException;
abstract void consumeData(ByteBuffer src) throws HttpException, IOException;
abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
abstract boolean isOutputReady();
abstract void produceOutput() throws HttpException, IOException;
abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
abstract void inputEnd() throws HttpException, IOException;
abstract void outputEnd() throws HttpException, IOException;
abstract boolean inputIdle();
abstract boolean outputIdle();
abstract boolean handleTimeout();
private void processCommands() throws HttpException, IOException {
for (;;) {
final Command command = ioSession.poll();
if (command == null) {
return;
}
if (command instanceof ShutdownCommand) {
final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
requestShutdown(shutdownCommand.getType());
} else if (command instanceof RequestExecutionCommand) {
if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
command.cancel();
} else {
execute((RequestExecutionCommand) command);
return;
}
} else {
throw new HttpException("Unexpected command: " + command.getClass());
}
}
}
public final void onConnect() throws HttpException, IOException {
if (connState == ConnectionState.READY) {
connState = ConnectionState.ACTIVE;
processCommands();
}
}
IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
if (messageHead != null) {
incomingMessageParser.reset();
}
return messageHead;
}
public final void onInput(final ByteBuffer src) throws HttpException, IOException {
if (src != null) {
inbuf.put(src);
}
if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inbuf.hasData() && inputIdle()) {
ioSession.clearEvent(SelectionKey.OP_READ);
return;
}
boolean endOfStream = false;
if (incomingMessage == null) {
final int bytesRead = inbuf.fill(ioSession);
if (bytesRead > 0) {
inTransportMetrics.incrementBytesTransferred(bytesRead);
}
endOfStream = bytesRead == -1;
}
do {
if (incomingMessage == null) {
final IncomingMessage messageHead = parseMessageHead(endOfStream);
if (messageHead != null) {
this.version = messageHead.getVersion();
updateInputMetrics(messageHead, connMetrics);
final ContentDecoder contentDecoder;
if (handleIncomingMessage(messageHead)) {
final long len = incomingContentStrategy.determineLength(messageHead);
contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
} else {
consumeHeader(messageHead, null);
contentDecoder = null;
}
capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
if (contentDecoder != null) {
incomingMessage = new Message<>(messageHead, contentDecoder);
} else {
inputEnd();
if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
ioSession.setEvent(SelectionKey.OP_READ);
}
}
} else {
break;
}
}
if (incomingMessage != null) {
final ContentDecoder contentDecoder = incomingMessage.getBody();
// At present the consumer can be forced to consume data
// over its declared capacity in order to avoid having
// unprocessed message body content stuck in the session
// input buffer
final int bytesRead = contentDecoder.read(contentBuffer);
if (bytesRead > 0) {
contentBuffer.flip();
consumeData(contentBuffer);
contentBuffer.clear();
final int capacity = capacityWindow.removeCapacity(bytesRead);
if (capacity <= 0) {
if (!contentDecoder.isCompleted()) {
updateCapacity(capacityWindow);
}
}
}
if (contentDecoder.isCompleted()) {
dataEnd(contentDecoder.getTrailers());
capacityWindow.close();
incomingMessage = null;
ioSession.setEvent(SelectionKey.OP_READ);
inputEnd();
}
if (bytesRead == 0) {
break;
}
}
} while (inbuf.hasData());
if (endOfStream && !inbuf.hasData()) {
if (outputIdle() && inputIdle()) {
requestShutdown(CloseMode.GRACEFUL);
} else {
shutdownSession(new ConnectionClosedException("Connection closed by peer"));
}
}
}
public final void onOutput() throws IOException, HttpException {
ioSession.getLock().lock();
try {
if (outbuf.hasData()) {
final int bytesWritten = outbuf.flush(ioSession);
if (bytesWritten > 0) {
outTransportMetrics.incrementBytesTransferred(bytesWritten);
}
}
} finally {
ioSession.getLock().unlock();
}
if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
final int pendingOutputRequests = outputRequests.get();
produceOutput();
final boolean outputPending = isOutputReady();
final boolean outputEnd;
ioSession.getLock().lock();
try {
if (!outputPending && !outbuf.hasData() && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
ioSession.clearEvent(SelectionKey.OP_WRITE);
} else {
outputRequests.addAndGet(-pendingOutputRequests);
}
outputEnd = outgoingMessage == null && !outbuf.hasData();
} finally {
ioSession.getLock().unlock();
}
if (outputEnd) {
outputEnd();
if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
processCommands();
} else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
connState = ConnectionState.SHUTDOWN;
}
}
}
if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
ioSession.close();
}
}
public final void onTimeout(final Timeout timeout) throws IOException, HttpException {
if (!handleTimeout()) {
onException(SocketTimeoutExceptionFactory.create(timeout));
}
}
public final void onException(final Exception ex) {
shutdownSession(ex);
CommandSupport.failCommands(ioSession, ex);
}
public final void onDisconnect() {
disconnected();
CommandSupport.cancelCommands(ioSession);
}
void requestShutdown(final CloseMode closeMode) {
switch (closeMode) {
case GRACEFUL:
if (connState == ConnectionState.ACTIVE) {
connState = ConnectionState.GRACEFUL_SHUTDOWN;
}
break;
case IMMEDIATE:
connState = ConnectionState.SHUTDOWN;
break;
}
ioSession.setEvent(SelectionKey.OP_WRITE);
}
void commitMessageHead(
final OutgoingMessage messageHead,
final boolean endStream,
final FlushMode flushMode) throws HttpException, IOException {
ioSession.getLock().lock();
try {
outgoingMessageWriter.write(messageHead, outbuf);
updateOutputMetrics(messageHead, connMetrics);
if (!endStream) {
final ContentEncoder contentEncoder;
if (handleOutgoingMessage(messageHead)) {
final long len = outgoingContentStrategy.determineLength(messageHead);
contentEncoder = createContentEncoder(len, ioSession, outbuf, outTransportMetrics);
} else {
contentEncoder = null;
}
if (contentEncoder != null) {
outgoingMessage = new Message<>(messageHead, contentEncoder);
}
}
outgoingMessageWriter.reset();
if (flushMode == FlushMode.IMMEDIATE) {
outbuf.flush(ioSession);
}
ioSession.setEvent(EventMask.WRITE);
} finally {
ioSession.getLock().unlock();
}
}
void requestSessionInput() {
ioSession.setEvent(SelectionKey.OP_READ);
}
void requestSessionOutput() {
outputRequests.incrementAndGet();
ioSession.setEvent(SelectionKey.OP_WRITE);
}
Timeout getSessionTimeout() {
return ioSession.getSocketTimeout();
}
void setSessionTimeout(final Timeout timeout) {
ioSession.setSocketTimeout(timeout);
}
void suspendSessionInput() {
ioSession.clearEvent(SelectionKey.OP_READ);
}
void suspendSessionOutput() throws IOException {
ioSession.getLock().lock();
try {
if (outbuf.hasData()) {
outbuf.flush(ioSession);
} else {
ioSession.clearEvent(SelectionKey.OP_WRITE);
}
} finally {
ioSession.getLock().unlock();
}
}
int streamOutput(final ByteBuffer src) throws IOException {
ioSession.getLock().lock();
try {
if (outgoingMessage == null) {
throw new ClosedChannelException();
}
final ContentEncoder contentEncoder = outgoingMessage.getBody();
final int bytesWritten = contentEncoder.write(src);
if (bytesWritten > 0) {
ioSession.setEvent(SelectionKey.OP_WRITE);
}
return bytesWritten;
} finally {
ioSession.getLock().unlock();
}
}
enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
ioSession.getLock().lock();
try {
if (outgoingMessage == null) {
return MessageDelineation.NONE;
}
final ContentEncoder contentEncoder = outgoingMessage.getBody();
contentEncoder.complete(trailers);
ioSession.setEvent(SelectionKey.OP_WRITE);
outgoingMessage = null;
return contentEncoder instanceof ChunkEncoder
? MessageDelineation.CHUNK_CODED
: MessageDelineation.MESSAGE_HEAD;
} finally {
ioSession.getLock().unlock();
}
}
boolean isOutputCompleted() {
ioSession.getLock().lock();
try {
if (outgoingMessage == null) {
return true;
}
final ContentEncoder contentEncoder = outgoingMessage.getBody();
return contentEncoder.isCompleted();
} finally {
ioSession.getLock().unlock();
}
}
@Override
public void close() throws IOException {
ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
}
@Override
public void close(final CloseMode closeMode) {
ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
}
@Override
public boolean isOpen() {
return connState == ConnectionState.ACTIVE;
}
@Override
public Timeout getSocketTimeout() {
return ioSession.getSocketTimeout();
}
@Override
public void setSocketTimeout(final Timeout timeout) {
ioSession.setSocketTimeout(timeout);
}
@Override
public EndpointDetails getEndpointDetails() {
if (endpointDetails == null) {
endpointDetails = new BasicEndpointDetails(
ioSession.getRemoteAddress(),
ioSession.getLocalAddress(),
connMetrics,
ioSession.getSocketTimeout());
}
return endpointDetails;
}
@Override
public ProtocolVersion getProtocolVersion() {
return version;
}
@Override
public SocketAddress getRemoteAddress() {
return ioSession.getRemoteAddress();
}
@Override
public SocketAddress getLocalAddress() {
return ioSession.getLocalAddress();
}
@Override
public SSLSession getSSLSession() {
final TlsDetails tlsDetails = ioSession.getTlsDetails();
return tlsDetails != null ? tlsDetails.getSSLSession() : null;
}
void appendState(final StringBuilder buf) {
buf.append("connState=").append(connState)
.append(", inbuf=").append(inbuf)
.append(", outbuf=").append(outbuf)
.append(", inputWindow=").append(capacityWindow != null ? capacityWindow.getWindow() : 0);
}
static class CapacityWindow implements CapacityChannel {
private final IOSession ioSession;
private final Object lock;
private int window;
private boolean closed;
CapacityWindow(final int window, final IOSession ioSession) {
this.window = window;
this.ioSession = ioSession;
this.lock = new Object();
}
@Override
public void update(final int increment) throws IOException {
synchronized (lock) {
if (closed) {
return;
}
if (increment > 0) {
updateWindow(increment);
ioSession.setEvent(SelectionKey.OP_READ);
}
}
}
/**
* Internal method for removing capacity. We don't need to check
* if this channel is closed in it.
*/
int removeCapacity(final int delta) {
synchronized (lock) {
updateWindow(-delta);
if (window <= 0) {
ioSession.clearEvent(SelectionKey.OP_READ);
}
return window;
}
}
private void updateWindow(final int delta) {
int newValue = window + delta;
// Math.addExact
if (((window ^ newValue) & (delta ^ newValue)) < 0) {
newValue = delta < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
}
window = newValue;
}
/**
* Closes the capacity channel, preventing user code from accidentally requesting
* read events outside of the context of the request the channel was created for
*/
void close() {
synchronized (lock) {
closed = true;
}
}
// visible for testing
int getWindow() {
return window;
}
}
}