blob: 42b74ceb3162d9ce6874dd62f867bca45ece8457 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.mina.transport.socket.nio;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import org.apache.mina.core.RuntimeIoException;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.AbstractIoAcceptor;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoProcessor;
import org.apache.mina.core.service.TransportMetadata;
import org.apache.mina.core.session.AbstractIoSession;
import org.apache.mina.core.session.ExpiringSessionRecycler;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.session.IoSessionConfig;
import org.apache.mina.core.session.IoSessionRecycler;
import org.apache.mina.core.write.WriteRequest;
import org.apache.mina.core.write.WriteRequestQueue;
import org.apache.mina.transport.socket.DatagramAcceptor;
import org.apache.mina.transport.socket.DatagramSessionConfig;
import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
import org.apache.mina.util.ExceptionMonitor;
/**
* {@link IoAcceptor} for datagram transport (UDP/IP).
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
* @org.apache.xbean.XBean
*/
public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> {
/**
* A session recycler that is used to retrieve an existing session, unless it's too old.
**/
private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
/**
* A timeout used for the select, as we need to get out to deal with idle
* sessions
*/
private static final long SELECT_TIMEOUT = 1000L;
/** A lock used to protect the selector to be waked up before it's created */
private final Semaphore lock = new Semaphore(1);
/** A queue used to store the list of pending Binds */
private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<>();
private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
.synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
private volatile boolean selectable;
/** The thread responsible of accepting incoming requests */
private Acceptor acceptor;
private long lastIdleCheckTime;
/** The Selector used by this acceptor */
private volatile Selector selector;
/**
* Creates a new instance.
*/
public NioDatagramAcceptor() {
this(new DefaultDatagramSessionConfig(), null);
}
/**
* Creates a new instance.
*
* @param executor The executor to use
*/
public NioDatagramAcceptor(Executor executor) {
this(new DefaultDatagramSessionConfig(), executor);
}
/**
* Creates a new instance.
*/
private NioDatagramAcceptor(IoSessionConfig sessionConfig, Executor executor) {
super(sessionConfig, executor);
try {
init();
selectable = true;
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeIoException("Failed to initialize.", e);
} finally {
if (!selectable) {
try {
destroy();
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
}
}
/**
* This private class is used to accept incoming connection from
* clients. It's an infinite loop, which can be stopped when all
* the registered handles have been removed (unbound).
*/
private class Acceptor implements Runnable {
@Override
public void run() {
int nHandles = 0;
lastIdleCheckTime = System.currentTimeMillis();
// Release the lock
lock.release();
while (selectable) {
try {
int selected = select(SELECT_TIMEOUT);
nHandles += registerHandles();
if (nHandles == 0) {
try {
lock.acquire();
if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
acceptor = null;
break;
}
} finally {
lock.release();
}
}
if (selected > 0) {
processReadySessions(selectedHandles());
}
long currentTime = System.currentTimeMillis();
flushSessions(currentTime);
nHandles -= unregisterHandles();
notifyIdleSessions(currentTime);
} catch (ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
ExceptionMonitor.getInstance().exceptionCaught(cse);
break;
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
}
}
}
if (selectable && isDisposing()) {
selectable = false;
try {
destroy();
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
disposalFuture.setValue(true);
}
}
}
}
private int registerHandles() {
for (;;) {
AcceptorOperationFuture req = registerQueue.poll();
if (req == null) {
break;
}
Map<SocketAddress, DatagramChannel> newHandles = new HashMap<>();
List<SocketAddress> localAddresses = req.getLocalAddresses();
try {
for (SocketAddress socketAddress : localAddresses) {
DatagramChannel handle = open(socketAddress);
newHandles.put(localAddress(handle), handle);
}
boundHandles.putAll(newHandles);
getListeners().fireServiceActivated();
req.setDone();
return newHandles.size();
} catch (Exception e) {
req.setException(e);
} finally {
// Roll back if failed to bind all addresses.
if (req.getException() != null) {
for (DatagramChannel handle : newHandles.values()) {
try {
close(handle);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
wakeup();
}
}
}
return 0;
}
private void processReadySessions(Set<SelectionKey> handles) {
final Iterator<SelectionKey> iterator = handles.iterator();
while (iterator.hasNext()) {
try {
final SelectionKey key = iterator.next();
final DatagramChannel handle = (DatagramChannel) key.channel();
if (key.isValid()) {
if (key.isReadable()) {
readHandle(handle);
}
if (key.isWritable()) {
for (IoSession session : getManagedSessions().values()) {
final NioSession x = (NioSession) session;
if (x.getChannel() == handle) {
scheduleFlush(x);
}
}
}
}
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
iterator.remove();
}
}
}
private boolean scheduleFlush(NioSession session) {
// Set the schedule for flush flag if the session
// has not already be added to the flushingSessions
// queue
if (session.setScheduledForFlush(true)) {
flushingSessions.add(session);
return true;
} else {
return false;
}
}
private void readHandle(DatagramChannel handle) throws Exception {
IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize());
SocketAddress remoteAddress = receive(handle, readBuf);
if (remoteAddress != null) {
IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle));
readBuf.flip();
if (!session.isReadSuspended()) {
session.getFilterChain().fireMessageReceived(readBuf);
}
}
}
private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
DatagramChannel handle = boundHandles.get(localAddress);
if (handle == null) {
throw new IllegalArgumentException("Unknown local address: " + localAddress);
}
IoSession session;
synchronized (sessionRecycler) {
session = sessionRecycler.recycle(remoteAddress);
if (session != null) {
return session;
}
// If a new session needs to be created.
NioSession newSession = newSession(this, handle, remoteAddress);
getSessionRecycler().put(newSession);
session = newSession;
}
initSession(session, null, null);
try {
this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
getListeners().fireSessionCreated(session);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
return session;
}
private void flushSessions(long currentTime) {
for (;;) {
NioSession session = flushingSessions.poll();
if (session == null) {
break;
}
// Reset the Schedule for flush flag for this session,
// as we are flushing it now
session.unscheduledForFlush();
try {
boolean flushedAll = flush(session, currentTime);
if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) {
scheduleFlush(session);
}
} catch (Exception e) {
session.getFilterChain().fireExceptionCaught(e);
}
}
}
private boolean flush(NioSession session, long currentTime) throws Exception {
final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
+ (session.getConfig().getMaxReadBufferSize() >>> 1);
int writtenBytes = 0;
try {
for (;;) {
WriteRequest req = session.getCurrentWriteRequest();
if (req == null) {
req = writeRequestQueue.poll(session);
if (req == null) {
setInterestedInWrite(session, false);
break;
}
session.setCurrentWriteRequest(req);
}
IoBuffer buf = (IoBuffer) req.getMessage();
if (buf.remaining() == 0) {
// Clear and fire event
session.setCurrentWriteRequest(null);
buf.reset();
session.getFilterChain().fireMessageSent(req);
continue;
}
SocketAddress destination = req.getDestination();
if (destination == null) {
destination = session.getRemoteAddress();
}
int localWrittenBytes = send(session, buf, destination);
if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
// Kernel buffer is full or wrote too much
setInterestedInWrite(session, true);
return false;
} else {
setInterestedInWrite(session, false);
// Clear and fire event
session.setCurrentWriteRequest(null);
writtenBytes += localWrittenBytes;
buf.reset();
session.getFilterChain().fireMessageSent(req);
}
}
} finally {
session.increaseWrittenBytes(writtenBytes, currentTime);
}
return true;
}
private int unregisterHandles() {
int nHandles = 0;
for (;;) {
AcceptorOperationFuture request = cancelQueue.poll();
if (request == null) {
break;
}
// close the channels
for (SocketAddress socketAddress : request.getLocalAddresses()) {
DatagramChannel handle = boundHandles.remove(socketAddress);
if (handle == null) {
continue;
}
try {
close(handle);
wakeup(); // wake up again to trigger thread death
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
nHandles++;
}
}
request.setDone();
}
return nHandles;
}
private void notifyIdleSessions(long currentTime) {
// process idle sessions
if (currentTime - lastIdleCheckTime >= 1000) {
lastIdleCheckTime = currentTime;
AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime);
}
}
/**
* Starts the inner Acceptor thread.
*/
private void startupAcceptor() throws InterruptedException {
if (!selectable) {
registerQueue.clear();
cancelQueue.clear();
flushingSessions.clear();
}
lock.acquire();
if (acceptor == null) {
acceptor = new Acceptor();
executeWorker(acceptor);
} else {
lock.release();
}
}
protected void init() throws Exception {
this.selector = Selector.open();
}
/**
* {@inheritDoc}
*/
@Override
public void add(NioSession session) {
// Nothing to do for UDP
}
/**
* {@inheritDoc}
*/
@Override
protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
// Create a bind request as a Future operation. When the selector
// have handled the registration, it will signal this future.
AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
// adds the Registration request to the queue for the Workers
// to handle
registerQueue.add(request);
// creates the Acceptor instance and has the local
// executor kick it off.
startupAcceptor();
// As we just started the acceptor, we have to unblock the select()
// in order to process the bind request we just have added to the
// registerQueue.
try {
lock.acquire();
// Wait a bit to give a chance to the Acceptor thread to do the select()
Thread.sleep(10);
wakeup();
} finally {
lock.release();
}
// Now, we wait until this request is completed.
request.awaitUninterruptibly();
if (request.getException() != null) {
throw request.getException();
}
// Update the local addresses.
// setLocalAddresses() shouldn't be called from the worker thread
// because of deadlock.
Set<SocketAddress> newLocalAddresses = new HashSet<>();
for (DatagramChannel handle : boundHandles.values()) {
newLocalAddresses.add(localAddress(handle));
}
return newLocalAddresses;
}
protected void close(DatagramChannel handle) throws Exception {
SelectionKey key = handle.keyFor(selector);
if (key != null) {
key.cancel();
}
handle.disconnect();
handle.close();
}
protected void destroy() throws Exception {
if (selector != null) {
selector.close();
}
}
/**
* {@inheritDoc}
*/
@Override
protected void dispose0() throws Exception {
unbind();
startupAcceptor();
wakeup();
}
/**
* {@inheritDoc}
*/
@Override
public void flush(NioSession session) {
if (scheduleFlush(session)) {
wakeup();
}
}
@Override
public InetSocketAddress getDefaultLocalAddress() {
return (InetSocketAddress) super.getDefaultLocalAddress();
}
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) super.getLocalAddress();
}
/**
* {@inheritDoc}
*/
@Override
public DatagramSessionConfig getSessionConfig() {
return (DatagramSessionConfig) sessionConfig;
}
@Override
public final IoSessionRecycler getSessionRecycler() {
return sessionRecycler;
}
@Override
public TransportMetadata getTransportMetadata() {
return NioDatagramSession.METADATA;
}
protected boolean isReadable(DatagramChannel handle) {
SelectionKey key = handle.keyFor(selector);
if ((key == null) || (!key.isValid())) {
return false;
}
return key.isReadable();
}
protected boolean isWritable(DatagramChannel handle) {
SelectionKey key = handle.keyFor(selector);
if ((key == null) || (!key.isValid())) {
return false;
}
return key.isWritable();
}
protected SocketAddress localAddress(DatagramChannel handle) throws Exception {
InetSocketAddress inetSocketAddress = (InetSocketAddress) handle.socket().getLocalSocketAddress();
InetAddress inetAddress = inetSocketAddress.getAddress();
if ((inetAddress instanceof Inet6Address) && (((Inet6Address) inetAddress).isIPv4CompatibleAddress())) {
// Ugly hack to workaround a problem on linux : the ANY address is always converted to IPV6
// even if the original address was an IPV4 address. We do store the two IPV4 and IPV6
// ANY address in the map.
byte[] ipV6Address = ((Inet6Address) inetAddress).getAddress();
byte[] ipV4Address = new byte[4];
System.arraycopy(ipV6Address, 12, ipV4Address, 0, 4);
InetAddress inet4Adress = Inet4Address.getByAddress(ipV4Address);
return new InetSocketAddress(inet4Adress, inetSocketAddress.getPort());
} else {
return inetSocketAddress;
}
}
protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle,
SocketAddress remoteAddress) {
SelectionKey key = handle.keyFor(selector);
if ((key == null) || (!key.isValid())) {
return null;
}
NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress);
newSession.setSelectionKey(key);
return newSession;
}
/**
* {@inheritDoc}
*/
@Override
public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
if (isDisposing()) {
throw new IllegalStateException("The Acceptor is being disposed.");
}
if (remoteAddress == null) {
throw new IllegalArgumentException("remoteAddress");
}
synchronized (bindLock) {
if (!isActive()) {
throw new IllegalStateException("Can't create a session from a unbound service.");
}
try {
return newSessionWithoutLock(remoteAddress, localAddress);
} catch (RuntimeException | Error e) {
throw e;
} catch (Exception e) {
throw new RuntimeIoException("Failed to create a session.", e);
}
}
}
protected DatagramChannel open(SocketAddress localAddress) throws Exception {
final DatagramChannel ch = DatagramChannel.open();
boolean success = false;
try {
new NioDatagramSessionConfig(ch).setAll(getSessionConfig());
ch.configureBlocking(false);
try {
ch.socket().bind(localAddress);
} catch (IOException ioe) {
// Add some info regarding the address we try to bind to the
// message
String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
+ ioe.getMessage();
Exception e = new IOException(newMessage);
e.initCause(ioe.getCause());
// And close the channel
ch.close();
throw e;
}
ch.register(selector, SelectionKey.OP_READ);
success = true;
} finally {
if (!success) {
close(ch);
}
}
return ch;
}
protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception {
return handle.receive(buffer.buf());
}
/**
* {@inheritDoc}
*/
@Override
public void remove(NioSession session) {
getSessionRecycler().remove(session);
getListeners().fireSessionDestroyed(session);
}
protected int select() throws Exception {
return selector.select();
}
protected int select(long timeout) throws Exception {
return selector.select(timeout);
}
protected Set<SelectionKey> selectedHandles() {
return selector.selectedKeys();
}
protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception {
return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
}
@Override
public void setDefaultLocalAddress(InetSocketAddress localAddress) {
setDefaultLocalAddress((SocketAddress) localAddress);
}
protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
SelectionKey key = session.getSelectionKey();
if (key == null) {
return;
}
int newInterestOps = key.interestOps();
if (isInterested) {
newInterestOps |= SelectionKey.OP_WRITE;
} else {
newInterestOps &= ~SelectionKey.OP_WRITE;
}
key.interestOps(newInterestOps);
}
@Override
public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
synchronized (bindLock) {
if (isActive()) {
throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound.");
}
if (sessionRecycler == null) {
sessionRecycler = DEFAULT_RECYCLER;
}
this.sessionRecycler = sessionRecycler;
}
}
/**
* {@inheritDoc}
*/
@Override
protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
cancelQueue.add(request);
startupAcceptor();
wakeup();
request.awaitUninterruptibly();
if (request.getException() != null) {
throw request.getException();
}
}
/**
* {@inheritDoc}
*/
@Override
public void updateTrafficControl(NioSession session) {
// Nothing to do
}
protected void wakeup() {
selector.wakeup();
}
/**
* {@inheritDoc}
*/
@Override
public void write(NioSession session, WriteRequest writeRequest) {
// We will try to write the message directly
long currentTime = System.currentTimeMillis();
final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
+ (session.getConfig().getMaxReadBufferSize() >>> 1);
int writtenBytes = 0;
// Deal with the special case of a Message marker (no bytes in the request)
// We just have to return after having calle dthe messageSent event
IoBuffer buf = (IoBuffer) writeRequest.getMessage();
if (buf.remaining() == 0) {
// Clear and fire event
session.setCurrentWriteRequest(null);
buf.reset();
session.getFilterChain().fireMessageSent(writeRequest);
return;
}
// Now, write the data
try {
for (;;) {
if (writeRequest == null) {
writeRequest = writeRequestQueue.poll(session);
if (writeRequest == null) {
setInterestedInWrite(session, false);
break;
}
session.setCurrentWriteRequest(writeRequest);
}
buf = (IoBuffer) writeRequest.getMessage();
if (buf.remaining() == 0) {
// Clear and fire event
session.setCurrentWriteRequest(null);
session.getFilterChain().fireMessageSent(writeRequest);
continue;
}
SocketAddress destination = writeRequest.getDestination();
if (destination == null) {
destination = session.getRemoteAddress();
}
int localWrittenBytes = send(session, buf, destination);
if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
// Kernel buffer is full or wrote too much
setInterestedInWrite(session, true);
writeRequestQueue.offer(session, writeRequest);
scheduleFlush(session);
break;
} else {
setInterestedInWrite(session, false);
// Clear and fire event
session.setCurrentWriteRequest(null);
writtenBytes += localWrittenBytes;
session.getFilterChain().fireMessageSent(writeRequest);
break;
}
}
} catch (Exception e) {
session.getFilterChain().fireExceptionCaught(e);
} finally {
session.increaseWrittenBytes(writtenBytes, currentTime);
}
}
}