blob: 5ec8cc242e5f0d666a44a6dbdfd43112f8a132c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.mina.session;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
import org.apache.mina.api.IdleStatus;
import org.apache.mina.api.IoFilter;
import org.apache.mina.api.IoFuture;
import org.apache.mina.api.IoHandler;
import org.apache.mina.api.IoServer;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
import org.apache.mina.api.IoSessionConfig;
import org.apache.mina.filterchain.ReadFilterChainController;
import org.apache.mina.filterchain.WriteFilterChainController;
import org.apache.mina.service.executor.CloseEvent;
import org.apache.mina.service.executor.HandshakeCompletedEvent;
import org.apache.mina.service.executor.HandshakeStartedEvent;
import org.apache.mina.service.executor.IdleEvent;
import org.apache.mina.service.executor.IoHandlerExecutor;
import org.apache.mina.service.executor.OpenEvent;
import org.apache.mina.service.executor.ReceiveEvent;
import org.apache.mina.service.executor.SecureClosedEvent;
import org.apache.mina.service.executor.SentEvent;
import org.apache.mina.service.idlechecker.IdleChecker;
import org.apache.mina.transport.nio.SelectorLoop;
import org.apache.mina.transport.nio.SslHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base implementation of {@link IoSession} shared with all the different transports.
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public abstract class AbstractIoSession implements IoSession, ReadFilterChainController, WriteFilterChainController {
/** The logger for this class */
private static final Logger LOG = LoggerFactory.getLogger(AbstractIoSession.class);
// A speedup for logs
private static final boolean IS_DEBUG = LOG.isDebugEnabled();
/** unique identifier generator */
private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
/** The session's unique identifier */
private final long id;
/** The session's creation time */
private final long creationTime;
/** The service this session is associated with */
private final IoService service;
/** attributes map */
private final AttributeContainer attributes = new DefaultAttributeContainer();
/** the {@link IdleChecker} in charge of detecting idle event for this session */
protected final IdleChecker idleChecker;
/** The session config */
protected IoSessionConfig config;
// ------------------------------------------------------------------------
// Basic statistics
// ------------------------------------------------------------------------
/** The number of bytes read since this session has been created */
private volatile long readBytes;
/** The number of bytes written since this session has been created */
private volatile long writtenBytes;
/** Last time something was read for this session */
private volatile long lastReadTime;
/** Last time something was written for this session */
private volatile long lastWriteTime;
// ------------------------------------------------------------------------
// Session state
// ------------------------------------------------------------------------
/** The session's state : one of CREATED, CONNECTED, CLOSING, CLOSED, SECURING, CONNECTED_SECURED */
protected volatile SessionState state;
/** A lock to protect the access to the session's state */
private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
/** A Read lock on the reentrant session's state lock */
private final Lock stateReadLock = stateLock.readLock();
/** A Write lock on the reentrant session's state lock */
private final Lock stateWriteLock = stateLock.writeLock();
/** Tells if the session is secured or not */
protected volatile boolean secured;
// ------------------------------------------------------------------------
// Filter chain
// ------------------------------------------------------------------------
/** The list of {@link IoFilter} implementing this chain. */
private final IoFilter[] chain;
/** the current position in the write chain for this thread */
private int writeChainPosition;
/** the current position in the read chain for this thread */
private int readChainPosition;
/**
* Create an {@link org.apache.mina.api.IoSession} with a unique identifier (
* {@link org.apache.mina.api.IoSession#getId()}) and an associated {@link IoService}
*
* @param service the service this session is associated with
* @param idleChecker the checker for idle session
*/
public AbstractIoSession(IoService service, IdleChecker idleChecker) {
// generated a unique id
id = NEXT_ID.getAndIncrement();
creationTime = System.currentTimeMillis();
this.service = service;
this.chain = service.getFilters();
this.idleChecker = idleChecker;
this.config = service.getSessionConfig();
if (IS_DEBUG) {
LOG.debug("Created new session with id : {}", id);
}
this.state = SessionState.CREATED;
service.getManagedSessions().put(id, this);
}
// ------------------------------------------------------------------------
// Session State management
// ------------------------------------------------------------------------
/**
* {@inheritDoc}
*/
@Override
public boolean isClosed() {
try {
stateReadLock.lock();
return state == SessionState.CLOSED;
} finally {
stateReadLock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean isClosing() {
try {
stateReadLock.lock();
return state == SessionState.CLOSING;
} finally {
stateReadLock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean isConnected() {
try {
stateReadLock.lock();
return state == SessionState.CONNECTED;
} finally {
stateReadLock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean isCreated() {
try {
stateReadLock.lock();
return state == SessionState.CREATED;
} finally {
stateReadLock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean isSecuring() {
try {
stateReadLock.lock();
return state == SessionState.SECURING;
} finally {
stateReadLock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean isConnectedSecured() {
try {
stateReadLock.lock();
return state == SessionState.SECURED;
} finally {
stateReadLock.unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public void changeState(SessionState to) {
try {
stateWriteLock.lock();
switch (state) {
case CREATED:
switch (to) {
case CONNECTED:
case SECURING:
case CLOSING:
state = to;
break;
default:
throw new IllegalStateException("Cannot transit from " + state + " to " + to);
}
break;
case CONNECTED:
switch (to) {
case SECURING:
case CLOSING:
state = to;
break;
default:
throw new IllegalStateException("Cannot transit from " + state + " to " + to);
}
break;
case SECURING:
switch (to) {
case SECURED:
case CLOSING:
state = to;
break;
default:
throw new IllegalStateException("Cannot transit from " + state + " to " + to);
}
break;
case SECURED:
switch (to) {
case CONNECTED:
case SECURING:
case CLOSING:
state = to;
break;
default:
throw new IllegalStateException("Cannot transit from " + state + " to " + to);
}
break;
case CLOSING:
if (to != SessionState.CLOSED) {
throw new IllegalStateException("Cannot transit from " + state + " to " + to);
}
state = to;
break;
case CLOSED:
throw new IllegalStateException("The session is already closed. cannot switch to " + to);
}
} finally {
stateWriteLock.unlock();
}
}
// ------------------------------------------------------------------------
// SSL/TLS session state management
// ------------------------------------------------------------------------
/**
* {@inheritDoc}
*/
@Override
public boolean isSecured() {
SslHelper helper = attributes.getAttribute(SSL_HELPER);
return helper != null && helper.isActive();
}
/**
* {@inheritDoc}
*/
@Override
public void initSecure(SSLContext sslContext) {
SslHelper sslHelper = new SslHelper(this, sslContext);
sslHelper.init();
attributes.setAttribute(SSL_HELPER, sslHelper);
}
/**
* {@inheritDoc}
*/
@Override
public long getId() {
return id;
}
/**
* {@inheritDoc}
*/
@Override
public long getCreationTime() {
return creationTime;
}
/**
* {@inheritDoc}
*/
@Override
public long getReadBytes() {
return readBytes;
}
/**
* To be called by the internal plumber when some bytes are written on the socket
*
* @param bytesCount number of extra bytes written
*/
public void incrementWrittenBytes(int bytesCount) {
writtenBytes += bytesCount;
}
/**
* {@inheritDoc}
*/
@Override
public long getWrittenBytes() {
return writtenBytes;
}
/**
* {@inheritDoc}
*/
@Override
public long getLastReadTime() {
return lastReadTime;
}
/**
* {@inheritDoc}
*/
@Override
public long getLastWriteTime() {
return lastWriteTime;
}
/**
* {@inheritDoc}
*/
@Override
public final long getLastIoTime() {
return Math.max(lastReadTime, lastWriteTime);
}
/**
* {@inheritDoc}
*/
@Override
public IoService getService() {
return service;
}
/**
* {@inheritDoc}
*
* @exception IllegalArgumentException if <code>key==null</code>
* @see #setAttribute(AttributeKey, Object)
*/
@Override
public final <T> T getAttribute(AttributeKey<T> key, T defaultValue) {
return attributes.getAttribute(key, defaultValue);
}
/**
* {@inheritDoc}
*
* @exception IllegalArgumentException if <code>key==null</code>
* @see #setAttribute(AttributeKey, Object)
*/
@Override
public final <T> T getAttribute(AttributeKey<T> key) {
return attributes.getAttribute(key);
}
/**
* {@inheritDoc}
*
* @exception IllegalArgumentException <ul>
* <li>
* if <code>key==null</code></li>
* <li>
* if <code>value</code> is not <code>null</code> and not an instance of type that is specified in by the
* given <code>key</code> (see {@link AttributeKey#getType()})</li>
* </ul>
*
* @see #getAttribute(AttributeKey)
*/
@Override
public final <T> T setAttribute(AttributeKey<? extends T> key, T value) {
return attributes.setAttribute(key, value);
};
/**
* {@inheritDoc}
*
* @see Collections#unmodifiableSet(Set)
*/
@Override
public Set<AttributeKey<?>> getAttributeKeys() {
return attributes.getAttributeKeys();
}
/**
* {@inheritDoc}
*
* @exception IllegalArgumentException if <code>key==null</code>
*/
@Override
public <T> T removeAttribute(AttributeKey<T> key) {
return attributes.removeAttribute(key);
}
// ----------------------------------------------------
// Write management
// ----------------------------------------------------
/**
* {@inheritDoc}
*/
@Override
public void write(Object message) {
doWriteWithFuture(message, null);
}
/**
* {@inheritDoc}
*/
@Override
public IoFuture<Void> writeWithFuture(Object message) {
IoFuture<Void> future = new DefaultWriteFuture();
doWriteWithFuture(message, future);
return future;
}
private void doWriteWithFuture(Object message, IoFuture<Void> future) {
if (IS_DEBUG) {
LOG.debug("writing message {} to session {}", message, this);
}
if ((state == SessionState.CLOSED) || (state == SessionState.CLOSING)) {
LOG.error("writing to closed or closing session, the message is discarded");
return;
}
WriteRequest writeRequest = new DefaultWriteRequest(message);
// process the queue
processMessageWriting(writeRequest, future);
}
// ------------------------------------------------------------------------
// Event processing using the filter chain
// ------------------------------------------------------------------------
/** send a caught exception to the {@link IoHandler} (if any) */
protected void processException(Exception t) {
if (IS_DEBUG) {
LOG.debug("caught session exception ", t);
}
IoHandler handler = getService().getIoHandler();
if (handler != null) {
handler.exceptionCaught(this, t);
}
}
/**
* process session open event using the filter chain. To be called by the session {@link SelectorLoop} .
*/
public void processSessionOpen() {
if (IS_DEBUG) {
LOG.debug("processing session open event");
}
try {
for (IoFilter filter : chain) {
filter.sessionOpened(this);
}
IoHandler handler = getService().getIoHandler();
if (handler != null) {
IoHandlerExecutor executor = getService().getIoHandlerExecutor();
if (executor != null) {
// asynchronous event
executor.execute(new OpenEvent(this));
} else {
// synchronous call (in the I/O loop)
handler.sessionOpened(this);
}
}
} catch (RuntimeException e) {
processException(e);
}
}
/**
* process session closed event using the filter chain. To be called by the session {@link SelectorLoop} .
*/
public void processSessionClosed() {
if (IS_DEBUG) {
LOG.debug("processing session closed event");
}
try {
for (IoFilter filter : chain) {
filter.sessionClosed(this);
}
IoHandler handler = getService().getIoHandler();
if (handler != null) {
IoHandlerExecutor executor = getService().getIoHandlerExecutor();
if (executor != null) {
// asynchronous event
executor.execute(new CloseEvent(this));
} else {
// synchronous call (in the I/O loop)
handler.sessionClosed(this);
}
}
} catch (RuntimeException e) {
LOG.error("Exception while closing the session : ", e);
}
service.getManagedSessions().remove(id);
}
/**
* process session idle event using the filter chain. To be called by the session {@link SelectorLoop} .
*/
public void processSessionIdle(IdleStatus status) {
if (IS_DEBUG) {
LOG.debug("processing session idle {} event for session {}", status, this);
}
try {
for (IoFilter filter : chain) {
filter.sessionIdle(this, status);
}
IoHandler handler = getService().getIoHandler();
if (handler != null) {
IoHandlerExecutor executor = getService().getIoHandlerExecutor();
if (executor != null) {
// asynchronous event
executor.execute(new IdleEvent(this, status));
} else {
// synchronous call (in the I/O loop)
handler.sessionIdle(this, status);
}
}
} catch (RuntimeException e) {
processException(e);
}
}
/** for knowing if the message buffer is the selector loop one */
static ThreadLocal<ByteBuffer> tl = new ThreadLocal<ByteBuffer>() {
@Override
protected ByteBuffer initialValue() {
return null;
}
};
/**
* process session message received event using the filter chain. To be called by the session {@link SelectorLoop} .
*
* @param message the received message
*/
public void processMessageReceived(ByteBuffer message) {
if (IS_DEBUG) {
LOG.debug("processing message '{}' received event for session {}", message, this);
}
tl.set(message);
try {
// save basic statistics
readBytes += message.remaining();
lastReadTime = System.currentTimeMillis();
if (chain.length < 1) {
if (IS_DEBUG) {
LOG.debug("Nothing to do, the chain is empty");
}
IoHandler handler = getService().getIoHandler();
if (handler != null) {
IoHandlerExecutor executor = getService().getIoHandlerExecutor();
if (executor != null) {
// asynchronous event
// copy the bytebuffer
if (IS_DEBUG) {
LOG.debug("copying bytebuffer before pushing to the executor");
}
ByteBuffer original = message;
ByteBuffer clone = ByteBuffer.allocate(original.capacity());
// copy from the beginning
original.rewind();
clone.put(original);
original.rewind();
clone.flip();
executor.execute(new ReceiveEvent(this, clone));
} else {
// synchronous call (in the I/O loop)
handler.messageReceived(this, message);
}
}
} else {
readChainPosition = 0;
// we call the first filter, it's supposed to call the next ones using the filter chain controller
chain[readChainPosition].messageReceived(this, message, this);
}
} catch (RuntimeException e) {
processException(e);
}
}
/**
* process session message writing event using the filter chain. To be called by the session {@link SelectorLoop} .
*
* @param message the wrote message, should be transformed into ByteBuffer at the end of the filter chain
*/
public void processMessageWriting(WriteRequest writeRequest, IoFuture<Void> future) {
if (IS_DEBUG) {
LOG.debug("processing message '{}' writing event for session {}", writeRequest, this);
}
try {
// lastWriteRequest = null;
if (chain.length < 1) {
enqueueWriteRequest(writeRequest);
} else {
writeChainPosition = chain.length - 1;
// we call the first filter, it's supposed to call the next ones using the filter chain controller
int position = writeChainPosition;
IoFilter nextFilter = chain[position];
nextFilter.messageWriting(this, writeRequest, this);
}
// put the future in the last write request
if (future != null) {
writeRequest.setFuture(future);
}
} catch (RuntimeException e) {
processException(e);
}
}
public void processMessageSent(Object highLevelMessage) {
if (IS_DEBUG) {
LOG.debug("processing message '{}' sent event for session {}", highLevelMessage, this);
}
try {
int size = chain.length;
for (int i = size - 1; i >= 0; i--) {
chain[i].messageSent(this, highLevelMessage);
}
IoHandler handler = getService().getIoHandler();
if (handler != null) {
IoHandlerExecutor executor = getService().getIoHandlerExecutor();
if (executor != null) {
// asynchronous event
executor.execute(new SentEvent(this, highLevelMessage));
} else {
// synchronous call (in the I/O loop)
handler.messageSent(this, highLevelMessage);
}
}
} catch (RuntimeException e) {
processException(e);
}
}
public void processHandshakeStarted() {
if (IS_DEBUG) {
LOG.debug("processing handshake started event for session {}", this);
}
try {
int size = chain.length;
for (int i = size - 1; i >= 0; i--) {
chain[i].handshakeStarted(this);
}
IoHandler handler = getService().getIoHandler();
if (handler != null) {
IoHandlerExecutor executor = getService().getIoHandlerExecutor();
if (executor != null) {
// asynchronous event
executor.execute(new HandshakeStartedEvent(this));
} else {
// synchronous call (in the I/O loop)
handler.handshakeStarted(this);
}
}
} catch (RuntimeException e) {
processException(e);
}
}
public void processHandshakeCompleted() {
if (IS_DEBUG) {
LOG.debug("processing handshake completed event for session {}", this);
}
try {
int size = chain.length;
for (int i = size - 1; i >= 0; i--) {
chain[i].handshakeCompleted(this);
}
IoHandler handler = getService().getIoHandler();
if (handler != null) {
IoHandlerExecutor executor = getService().getIoHandlerExecutor();
if (executor != null) {
// asynchronous event
executor.execute(new HandshakeCompletedEvent(this));
} else {
// synchronous call (in the I/O loop)
handler.handshakeCompleted(this);
}
}
} catch (RuntimeException e) {
processException(e);
}
}
public void processSecureClosed() {
if (IS_DEBUG) {
LOG.debug("processing secure closed event for session {}", this);
}
try {
int size = chain.length;
for (int i = size - 1; i >= 0; i--) {
chain[i].secureClosed(this);
}
IoHandler handler = getService().getIoHandler();
if (handler != null) {
IoHandlerExecutor executor = getService().getIoHandlerExecutor();
if (executor != null) {
// asynchronous event
executor.execute(new SecureClosedEvent(this));
} else {
// synchronous call (in the I/O loop)
handler.secureClosed(this);
}
}
} catch (RuntimeException e) {
processException(e);
}
}
/**
* process session message received event using the filter chain. To be called by the session {@link SelectorLoop} .
*
* @param message the received message
*/
@Override
public void callWriteNextFilter(WriteRequest message) {
if (IS_DEBUG) {
LOG.debug("calling next filter for writing for message '{}' position : {}", message, writeChainPosition);
}
writeChainPosition--;
if (writeChainPosition < 0 || chain.length == 0) {
// end of chain processing
enqueueWriteRequest(message);
} else {
chain[writeChainPosition].messageWriting(this, message, this);
}
writeChainPosition++;
}
/**
* {@inheritDoc}
*/
@Override
public void callWriteMessageForRead(Object message) {
if (IS_DEBUG) {
LOG.debug("calling filter for reading for message '{}' position : {} from a write event", message,
readChainPosition);
}
writeChainPosition = readChainPosition;
callWriteNextFilter(new DefaultWriteRequest(message));
}
/**
* {@inheritDoc}
*/
@Override
public void callReadNextFilter(Object message) {
readChainPosition++;
if (readChainPosition >= chain.length) {
// end of chain processing
IoHandler handler = getService().getIoHandler();
if (handler != null) {
IoHandlerExecutor executor = getService().getIoHandlerExecutor();
if (executor != null) {
// asynchronous event
if (message == tl.get()) {
// copy the bytebuffer
if (IS_DEBUG) {
LOG.debug("copying bytebuffer before pushing to the executor");
}
ByteBuffer original = (ByteBuffer) message;
ByteBuffer clone = ByteBuffer.allocate(original.capacity());
// copy from the beginning
original.rewind();
clone.put(original);
original.rewind();
clone.flip();
executor.execute(new ReceiveEvent(this, clone));
} else {
executor.execute(new ReceiveEvent(this, message));
}
} else {
// synchronous call (in the I/O loop)
handler.messageReceived(this, message);
}
}
} else {
chain[readChainPosition].messageReceived(this, message, this);
}
readChainPosition--;
}
/**
* {@inheritDoc}
*/
@Override
public String toString() {
if (isConnected() || isClosing()) {
String remote = null;
String local = null;
try {
remote = String.valueOf(getRemoteAddress());
} catch (Exception e) {
remote = "Cannot get the remote address informations: " + e.getMessage();
}
try {
local = String.valueOf(getLocalAddress());
} catch (Exception e) {
local = "Cannot get the local address informations: " + e.getMessage();
}
if (getService() instanceof IoServer) {
return "(" + getIdAsString() + ": " + getServiceName() + ", server, " + remote + " => " + local + ')';
}
return "(" + getIdAsString() + ": " + getServiceName() + ", client, " + local + " => " + remote + ')';
}
return "(" + getIdAsString() + ") Session disconnected ...";
}
private String getServiceName() {
return getService().getClass().getCanonicalName();
}
/** create string for session id padded with 0 */
private String getIdAsString() {
String id = Long.toHexString(getId()).toUpperCase();
// Somewhat inefficient, but it won't happen that often
// because an ID is often a big integer.
while (id.length() < 8) {
id = '0' + id; // padding
}
id = "0x" + id;
return id;
}
}