blob: 33ed80d71f983307eea3c13f092302c3d5ece6f9 [file] [log] [blame]
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.core5.reactor;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import javax.net.ssl.SSLContext;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
import org.apache.hc.core5.reactor.ssl.SSLIOSession;
import org.apache.hc.core5.reactor.ssl.SSLMode;
import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.Timeout;
final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
private final IOSession ioSession;
private final NamedEndpoint initialEndpoint;
private final Decorator<IOSession> ioSessionDecorator;
private final IOSessionListener sessionListener;
private final Queue<InternalDataChannel> closedSessions;
private final AtomicReference<SSLIOSession> tlsSessionRef;
private final AtomicReference<IOSession> currentSessionRef;
private final AtomicBoolean closed;
InternalDataChannel(
final IOSession ioSession,
final NamedEndpoint initialEndpoint,
final Decorator<IOSession> ioSessionDecorator,
final IOSessionListener sessionListener,
final Queue<InternalDataChannel> closedSessions) {
this.ioSession = ioSession;
this.initialEndpoint = initialEndpoint;
this.closedSessions = closedSessions;
this.ioSessionDecorator = ioSessionDecorator;
this.sessionListener = sessionListener;
this.tlsSessionRef = new AtomicReference<>(null);
this.currentSessionRef = new AtomicReference<>(
ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
this.closed = new AtomicBoolean(false);
}
@Override
public String getId() {
return ioSession.getId();
}
@Override
public NamedEndpoint getInitialEndpoint() {
return initialEndpoint;
}
@Override
public IOEventHandler getHandler() {
final IOSession currentSession = currentSessionRef.get();
return currentSession.getHandler();
}
@Override
public void upgrade(final IOEventHandler handler) {
final IOSession currentSession = currentSessionRef.get();
currentSession.upgrade(handler);
}
private IOEventHandler ensureHandler(final IOSession session) {
final IOEventHandler handler = session.getHandler();
Asserts.notNull(handler, "IO event handler");
return handler;
}
@Override
void onIOEvent(final int readyOps) throws IOException {
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
final IOSession currentSession = currentSessionRef.get();
currentSession.clearEvent(SelectionKey.OP_CONNECT);
if (tlsSessionRef.get() == null) {
if (sessionListener != null) {
sessionListener.connected(currentSession);
}
final IOEventHandler handler = ensureHandler(currentSession);
handler.connected(currentSession);
}
}
if ((readyOps & SelectionKey.OP_READ) != 0) {
final IOSession currentSession = currentSessionRef.get();
currentSession.updateReadTime();
if (sessionListener != null) {
sessionListener.inputReady(currentSession);
}
final IOEventHandler handler = ensureHandler(currentSession);
handler.inputReady(currentSession, null);
}
if ((readyOps & SelectionKey.OP_WRITE) != 0
|| (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
final IOSession currentSession = currentSessionRef.get();
currentSession.updateWriteTime();
if (sessionListener != null) {
sessionListener.outputReady(currentSession);
}
final IOEventHandler handler = ensureHandler(currentSession);
handler.outputReady(currentSession);
}
}
@Override
Timeout getTimeout() {
final IOSession currentSession = currentSessionRef.get();
return currentSession.getSocketTimeout();
}
@Override
void onTimeout(final Timeout timeout) throws IOException {
final IOSession currentSession = currentSessionRef.get();
if (sessionListener != null) {
sessionListener.timeout(currentSession);
}
final IOEventHandler handler = ensureHandler(currentSession);
handler.timeout(currentSession, timeout);
}
@Override
void onException(final Exception cause) {
final IOSession currentSession = currentSessionRef.get();
if (sessionListener != null) {
sessionListener.exception(currentSession, cause);
}
final IOEventHandler handler = currentSession.getHandler();
if (handler != null) {
handler.exception(currentSession, cause);
}
}
void onTLSSessionStart(final SSLIOSession sslSession) {
final IOSession currentSession = currentSessionRef.get();
if (sessionListener != null) {
sessionListener.connected(currentSession);
}
}
void onTLSSessionEnd(final SSLIOSession sslSession) {
if (closed.compareAndSet(false, true)) {
closedSessions.add(this);
}
}
void disconnected() {
final IOSession currentSession = currentSessionRef.get();
if (sessionListener != null) {
sessionListener.disconnected(currentSession);
}
final IOEventHandler handler = currentSession.getHandler();
if (handler != null) {
handler.disconnected(currentSession);
}
}
@Override
public void startTls(
final SSLContext sslContext,
final NamedEndpoint endpoint,
final SSLBufferMode sslBufferMode,
final SSLSessionInitializer initializer,
final SSLSessionVerifier verifier,
final Timeout handshakeTimeout) {
final SSLIOSession sslioSession = new SSLIOSession(
endpoint != null ? endpoint : initialEndpoint,
ioSession,
initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
sslContext,
sslBufferMode,
initializer,
verifier,
this::onTLSSessionStart,
this::onTLSSessionEnd,
handshakeTimeout);
if (tlsSessionRef.compareAndSet(null, sslioSession)) {
currentSessionRef.set(ioSessionDecorator != null ? ioSessionDecorator.decorate(sslioSession) : sslioSession);
if (sessionListener != null) {
sessionListener.startTls(sslioSession);
}
} else {
throw new IllegalStateException("TLS already activated");
}
}
@SuppressWarnings("resource")
@Override
public TlsDetails getTlsDetails() {
final SSLIOSession sslIoSession = tlsSessionRef.get();
return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
}
@Override
public Lock getLock() {
return ioSession.getLock();
}
@Override
public void close() {
close(CloseMode.GRACEFUL);
}
@Override
public void close(final CloseMode closeMode) {
final IOSession currentSession = currentSessionRef.get();
if (closeMode == CloseMode.IMMEDIATE) {
closed.set(true);
currentSession.close(closeMode);
} else {
if (closed.compareAndSet(false, true)) {
try {
currentSession.close(closeMode);
} finally {
closedSessions.add(this);
}
}
}
}
@Override
public IOSession.Status getStatus() {
final IOSession currentSession = currentSessionRef.get();
return currentSession.getStatus();
}
@Override
public boolean isOpen() {
final IOSession currentSession = currentSessionRef.get();
return currentSession.isOpen();
}
@Override
public void enqueue(final Command command, final Command.Priority priority) {
final IOSession currentSession = currentSessionRef.get();
currentSession.enqueue(command, priority);
}
@Override
public boolean hasCommands() {
final IOSession currentSession = currentSessionRef.get();
return currentSession.hasCommands();
}
@Override
public Command poll() {
final IOSession currentSession = currentSessionRef.get();
return currentSession.poll();
}
@Override
public ByteChannel channel() {
final IOSession currentSession = currentSessionRef.get();
return currentSession.channel();
}
@Override
public SocketAddress getRemoteAddress() {
return ioSession.getRemoteAddress();
}
@Override
public SocketAddress getLocalAddress() {
return ioSession.getLocalAddress();
}
@Override
public int getEventMask() {
final IOSession currentSession = currentSessionRef.get();
return currentSession.getEventMask();
}
@Override
public void setEventMask(final int ops) {
final IOSession currentSession = currentSessionRef.get();
currentSession.setEventMask(ops);
}
@Override
public void setEvent(final int op) {
final IOSession currentSession = currentSessionRef.get();
currentSession.setEvent(op);
}
@Override
public void clearEvent(final int op) {
final IOSession currentSession = currentSessionRef.get();
currentSession.clearEvent(op);
}
@Override
public Timeout getSocketTimeout() {
return ioSession.getSocketTimeout();
}
@Override
public void setSocketTimeout(final Timeout timeout) {
ioSession.setSocketTimeout(timeout);
}
@Override
public int read(final ByteBuffer dst) throws IOException {
final IOSession currentSession = currentSessionRef.get();
return currentSession.read(dst);
}
@Override
public int write(final ByteBuffer src) throws IOException {
final IOSession currentSession = currentSessionRef.get();
return currentSession.write(src);
}
@Override
public void updateReadTime() {
ioSession.updateReadTime();
}
@Override
public void updateWriteTime() {
ioSession.updateWriteTime();
}
@Override
public long getLastReadTime() {
return ioSession.getLastReadTime();
}
@Override
public long getLastWriteTime() {
return ioSession.getLastWriteTime();
}
@Override
public long getLastEventTime() {
return ioSession.getLastEventTime();
}
@Override
public String toString() {
final IOSession currentSession = currentSessionRef.get();
if (currentSession != null) {
return currentSession.toString();
} else {
return ioSession.toString();
}
}
}