blob: be273d1563dfd6865d6a29638eaea33ded3925ea [file] [log] [blame]
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.core5.reactor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.Closer;
import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.Timeout;
class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
private static final int MAX_CHANNEL_REQUESTS = 10000;
private final IOEventHandlerFactory eventHandlerFactory;
private final IOReactorConfig reactorConfig;
private final Decorator<IOSession> ioSessionDecorator;
private final IOSessionListener sessionListener;
private final Callback<IOSession> sessionShutdownCallback;
private final Queue<InternalDataChannel> closedSessions;
private final Queue<ChannelEntry> channelQueue;
private final Queue<IOSessionRequest> requestQueue;
private final AtomicBoolean shutdownInitiated;
private final long selectTimeoutMillis;
private volatile long lastTimeoutCheckMillis;
SingleCoreIOReactor(
final Callback<Exception> exceptionCallback,
final IOEventHandlerFactory eventHandlerFactory,
final IOReactorConfig reactorConfig,
final Decorator<IOSession> ioSessionDecorator,
final IOSessionListener sessionListener,
final Callback<IOSession> sessionShutdownCallback) {
super(exceptionCallback);
this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
this.ioSessionDecorator = ioSessionDecorator;
this.sessionListener = sessionListener;
this.sessionShutdownCallback = sessionShutdownCallback;
this.shutdownInitiated = new AtomicBoolean(false);
this.closedSessions = new ConcurrentLinkedQueue<>();
this.channelQueue = new ConcurrentLinkedQueue<>();
this.requestQueue = new ConcurrentLinkedQueue<>();
this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
}
void enqueueChannel(final ChannelEntry entry) throws IOReactorShutdownException {
if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
throw new IOReactorShutdownException("I/O reactor has been shut down");
}
this.channelQueue.add(entry);
this.selector.wakeup();
}
@Override
void doTerminate() {
closePendingChannels();
closePendingConnectionRequests();
processClosedSessions();
}
@Override
void doExecute() throws IOException {
while (!Thread.currentThread().isInterrupted()) {
final int readyCount = this.selector.select(this.selectTimeoutMillis);
if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
if (this.shutdownInitiated.compareAndSet(false, true)) {
initiateSessionShutdown();
}
closePendingChannels();
}
if (getStatus() == IOReactorStatus.SHUT_DOWN) {
break;
}
// Process selected I/O events
if (readyCount > 0) {
processEvents(this.selector.selectedKeys());
}
validateActiveChannels();
// Process closed sessions
processClosedSessions();
// If active process new channels
if (getStatus() == IOReactorStatus.ACTIVE) {
processPendingChannels();
processPendingConnectionRequests();
}
// Exit select loop if graceful shutdown has been completed
if (getStatus() == IOReactorStatus.SHUTTING_DOWN && this.selector.keys().isEmpty()) {
break;
}
if (getStatus() == IOReactorStatus.SHUT_DOWN) {
break;
}
}
}
private void initiateSessionShutdown() {
if (this.sessionShutdownCallback != null) {
final Set<SelectionKey> keys = this.selector.keys();
for (final SelectionKey key : keys) {
final InternalChannel channel = (InternalChannel) key.attachment();
if (channel instanceof InternalDataChannel) {
this.sessionShutdownCallback.execute((InternalDataChannel) channel);
}
}
}
}
private void validateActiveChannels() {
final long currentTimeMillis = System.currentTimeMillis();
if ((currentTimeMillis - this.lastTimeoutCheckMillis) >= this.selectTimeoutMillis) {
this.lastTimeoutCheckMillis = currentTimeMillis;
for (final SelectionKey key : this.selector.keys()) {
checkTimeout(key, currentTimeMillis);
}
}
}
private void processEvents(final Set<SelectionKey> selectedKeys) {
for (final SelectionKey key : selectedKeys) {
final InternalChannel channel = (InternalChannel) key.attachment();
if (channel != null) {
try {
channel.handleIOEvent(key.readyOps());
} catch (final CancelledKeyException ex) {
channel.close(CloseMode.GRACEFUL);
}
}
}
selectedKeys.clear();
}
private void processPendingChannels() throws IOException {
ChannelEntry entry;
for (int i = 0; i < MAX_CHANNEL_REQUESTS && (entry = this.channelQueue.poll()) != null; i++) {
final SocketChannel socketChannel = entry.channel;
final Object attachment = entry.attachment;
try {
prepareSocket(socketChannel.socket());
socketChannel.configureBlocking(false);
} catch (final IOException ex) {
logException(ex);
try {
socketChannel.close();
} catch (final IOException ex2) {
logException(ex2);
}
throw ex;
}
final SelectionKey key;
try {
key = socketChannel.register(this.selector, SelectionKey.OP_READ);
} catch (final ClosedChannelException ex) {
return;
}
final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
final InternalDataChannel dataChannel = new InternalDataChannel(
ioSession,
null,
ioSessionDecorator,
sessionListener,
closedSessions);
dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, attachment));
dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
key.attach(dataChannel);
dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
}
}
private void processClosedSessions() {
for (;;) {
final InternalDataChannel dataChannel = this.closedSessions.poll();
if (dataChannel == null) {
break;
}
try {
dataChannel.disconnected();
} catch (final CancelledKeyException ex) {
// ignore and move on
}
}
}
private void checkTimeout(final SelectionKey key, final long nowMillis) {
final InternalChannel channel = (InternalChannel) key.attachment();
if (channel != null) {
channel.checkTimeout(nowMillis);
}
}
@Override
public Future<IOSession> connect(
final NamedEndpoint remoteEndpoint,
final SocketAddress remoteAddress,
final SocketAddress localAddress,
final Timeout timeout,
final Object attachment,
final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
Args.notNull(remoteEndpoint, "Remote endpoint");
final IOSessionRequest sessionRequest = new IOSessionRequest(
remoteEndpoint,
remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
localAddress,
timeout,
attachment,
callback);
this.requestQueue.add(sessionRequest);
this.selector.wakeup();
return sessionRequest;
}
private void prepareSocket(final Socket socket) throws IOException {
socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
if (this.reactorConfig.getSndBufSize() > 0) {
socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
}
if (this.reactorConfig.getRcvBufSize() > 0) {
socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
}
if (this.reactorConfig.getTrafficClass() > 0) {
socket.setTrafficClass(this.reactorConfig.getTrafficClass());
}
final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
if (linger >= 0) {
socket.setSoLinger(true, linger);
}
}
private void validateAddress(final SocketAddress address) throws UnknownHostException {
if (address instanceof InetSocketAddress) {
final InetSocketAddress endpoint = (InetSocketAddress) address;
if (endpoint.isUnresolved()) {
throw new UnknownHostException(endpoint.getHostName());
}
}
}
private void processPendingConnectionRequests() {
IOSessionRequest sessionRequest;
for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
if (!sessionRequest.isCancelled()) {
final SocketChannel socketChannel;
try {
socketChannel = SocketChannel.open();
} catch (final IOException ex) {
sessionRequest.failed(ex);
return;
}
try {
processConnectionRequest(socketChannel, sessionRequest);
} catch (final IOException | SecurityException ex) {
Closer.closeQuietly(socketChannel);
sessionRequest.failed(ex);
}
}
}
}
private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
validateAddress(sessionRequest.localAddress);
validateAddress(sessionRequest.remoteAddress);
socketChannel.configureBlocking(false);
prepareSocket(socketChannel.socket());
if (sessionRequest.localAddress != null) {
final Socket sock = socketChannel.socket();
sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
sock.bind(sessionRequest.localAddress);
}
final SocketAddress targetAddress;
final IOEventHandlerFactory eventHandlerFactory;
if (this.reactorConfig.getSocksProxyAddress() != null) {
targetAddress = this.reactorConfig.getSocksProxyAddress();
eventHandlerFactory = new SocksProxyProtocolHandlerFactory(
sessionRequest.remoteAddress,
this.reactorConfig.getSocksProxyUsername(),
this.reactorConfig.getSocksProxyPassword(),
this.eventHandlerFactory);
} else {
targetAddress = sessionRequest.remoteAddress;
eventHandlerFactory = this.eventHandlerFactory;
}
// Run this under a doPrivileged to support lib users that run under a SecurityManager this allows granting connect permissions
// only to this library
final boolean connected;
try {
connected = AccessController.doPrivileged(
(PrivilegedExceptionAction<Boolean>) () -> socketChannel.connect(targetAddress));
} catch (final PrivilegedActionException e) {
Asserts.check(e.getCause() instanceof IOException,
"method contract violation only checked exceptions are wrapped: " + e.getCause());
// only checked exceptions are wrapped - error and RTExceptions are rethrown by doPrivileged
throw (IOException) e.getCause();
}
final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest, (k, sc, namedEndpoint, attachment) -> {
final IOSession ioSession = new IOSessionImpl("c", k, sc);
final InternalDataChannel dataChannel = new InternalDataChannel(
ioSession,
namedEndpoint,
ioSessionDecorator,
sessionListener,
closedSessions);
dataChannel.upgrade(eventHandlerFactory.createHandler(dataChannel, attachment));
dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
return dataChannel;
});
if (connected) {
channel.handleIOEvent(SelectionKey.OP_CONNECT);
} else {
key.attach(channel);
sessionRequest.assign(channel);
}
}
private void closePendingChannels() {
ChannelEntry entry;
while ((entry = this.channelQueue.poll()) != null) {
final SocketChannel socketChannel = entry.channel;
try {
socketChannel.close();
} catch (final IOException ex) {
logException(ex);
}
}
}
private void closePendingConnectionRequests() {
IOSessionRequest sessionRequest;
while ((sessionRequest = this.requestQueue.poll()) != null) {
sessionRequest.cancel();
}
}
}