blob: 3b31eb1bd4cb1b70fb89f6206362cf2f9a6f2075 [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.common.basics;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.time.Duration;
import java.util.Iterator;
import java.util.Set;
/**
* A NIOLooper, implementing WakeableLooper, is a class wrapping a Java NIO selector to dispatch events.
* It extends WakeableLooper, so it will execute in a while loop unless the exitLoop() is called.
* And in every execution, in tasksOnWakeup(), it will handle the selected keys.
* The NIOLooper should start by calling {@code loop()}
*/
public class NIOLooper extends WakeableLooper {
private final Selector selector;
public NIOLooper() throws IOException {
selector = Selector.open();
addNIOLooperTasks();
}
private void addNIOLooperTasks() {
Runnable task = new Runnable() {
@Override
public void run() {
handleSelectedKeys();
}
};
addTasksOnWakeup(task);
}
@Override
public void doWait() {
// If timer task exists, the doWait() should wait not later than the time timer to execute
// It no timer exists, we consider it will wait forever until other threads call wakeUp()
Duration nextTimeoutInterval = getNextTimeoutInterval();
// doWait(timeout), which in fact is implemented by selector.select(timeout), and it will
// wake up, if other threads wake it up, it meets the timeout, one channel is selected, or
// the current thread is interrupted.
try {
if (nextTimeoutInterval.toMillis() > 0) {
// The select will take the timeout in unit of milli-seconds
selector.select(nextTimeoutInterval.toMillis());
} else {
selector.selectNow();
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
@Override
public void wakeUp() {
selector.wakeup();
}
public void removeAllInterest(SelectableChannel channel) {
SelectionKey key = channel.keyFor(selector);
if (key != null) {
key.cancel();
}
}
// Handle the selected keys
private void handleSelectedKeys() {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
ISelectHandler callback = (ISelectHandler) key.attachment();
if (!key.isValid()) {
// This method key.channel() will continue to return the channel even after the
// key is cancelled.
callback.handleError(key.channel());
continue;
}
// We need to check whether the key is still valid since:
// 1. The key could be cancelled by last operation
// 2. The process might not fail-fast or throw exceptions after the key is cancelled
if (key.isValid() && key.isWritable()) {
callback.handleWrite(key.channel());
}
if (key.isValid() && key.isReadable()) {
callback.handleRead(key.channel());
}
if (key.isValid() && key.isConnectable()) {
callback.handleConnect(key.channel());
}
if (key.isValid() && key.isAcceptable()) {
callback.handleAccept(key.channel());
}
}
}
public boolean isChannelValid(SelectableChannel channel) {
SelectionKey key = channel.keyFor(selector);
return key != null && key.isValid();
}
/**
* Followings are the register, unregister, isRegister for different operations for the selector and channel
*/
public void registerRead(SelectableChannel channel, ISelectHandler callback)
throws ClosedChannelException {
assert channel.keyFor(selector) == null
|| (channel.keyFor(selector).interestOps() & SelectionKey.OP_CONNECT) == 0;
addInterest(channel, SelectionKey.OP_READ, callback);
}
public void unregisterRead(SelectableChannel channel) {
removeInterest(channel, SelectionKey.OP_READ);
}
public boolean isReadRegistered(SelectableChannel channel) {
return isInterestRegistered(channel, SelectionKey.OP_READ);
}
public void registerConnect(SelectableChannel channel, ISelectHandler callback)
throws ClosedChannelException {
// This channel should be first use
assert channel.keyFor(selector) == null;
addInterest(channel, SelectionKey.OP_CONNECT, callback);
}
public void unregisterConnect(SelectableChannel channel) {
removeInterest(channel, SelectionKey.OP_CONNECT);
}
public boolean isConnectRegistered(SelectableChannel channel) {
return isInterestRegistered(channel, SelectionKey.OP_CONNECT);
}
public void registerAccept(SelectableChannel channel, ISelectHandler callback)
throws ClosedChannelException {
addInterest(channel, SelectionKey.OP_ACCEPT, callback);
}
public void unregisterAccept(SelectableChannel channel) {
removeInterest(channel, SelectionKey.OP_ACCEPT);
}
public boolean isAcceptRegistered(SelectableChannel channel) {
return isInterestRegistered(channel, SelectionKey.OP_ACCEPT);
}
public void registerWrite(SelectableChannel channel, ISelectHandler callback)
throws ClosedChannelException {
addInterest(channel, SelectionKey.OP_WRITE, callback);
}
public void unregisterWrite(SelectableChannel channel) {
removeInterest(channel, SelectionKey.OP_WRITE);
}
public boolean isWriteRegistered(SelectableChannel channel) {
return isInterestRegistered(channel, SelectionKey.OP_WRITE);
}
/**
* Register an operation interest on a SelectableChannel, with ISelectHandler as callback attachment
* There are two cases when trying to register an interest
* 1. The whole key does not exist; no interests ever registered for this channel
* 2. The key exists due to other interests registered but not the one we are adding
* <p>
* In 1st case, we just register this channel with operation on the given Selector
* In 2nd case, we have to make sure the state of NIOLooper is clean:
* 1. Key has to be valid
* 2. The interest has not yet been registered
* 3. If old attached ISelectHandler exists, it has to be the same as new one
* If any one of above 3 conditions are not met, RuntimeException would be thrown.
*
* @param channel The Selectable to register operation interest
* @param operation The operation interest to register
* @param callback The Callback to handle
* @throws ClosedChannelException if Channel is closed when trying to register an interest
*/
private void addInterest(SelectableChannel channel,
int operation,
ISelectHandler callback)
throws ClosedChannelException {
SelectionKey key = channel.keyFor(selector);
if (key == null) {
channel.register(selector, operation, callback);
} else if (!key.isValid()) {
throw new RuntimeException(
String.format("Unable to add %d in %s due to key is invalid", operation, channel));
} else {
// Key is not null and key is valid
if ((key.interestOps() & operation) != 0) {
throw new RuntimeException(
String.format("%d has been registered in %s", operation, channel));
}
if (key.attachment() == null) {
key.attach(callback);
} else {
if (callback != key.attachment()) {
throw new RuntimeException("Unmatched SelectHandler has already been attached"
+ " for other operation");
}
// If call == key.attachment
// Just skip
}
key.interestOps(key.interestOps() | operation);
}
}
/**
* Remove one operation interest on a SelectableChannel.
* The SelectableChannel has to be registered with Selector ahead.
* Otherwise, NullPointerExceptions would throw
* The key for SelectableChannel has to be valid.
* Otherwise, InvalidValid Exception would throw.
*
* @param channel the SelectableChannel to remove operation interest
* @param operation the interest to remove
*/
private void removeInterest(SelectableChannel channel, int operation) {
SelectionKey key = channel.keyFor(selector);
// Exception would be thrown if key is null or key is inValid
// We do not need double check it ahead
key.interestOps(key.interestOps() & (~operation));
}
/**
* Check whether an operation interest was registered on a SelectableChannel
* There are two cases that interest is not registered
* 1. The whole key does not exist; no interests ever registered for this channel
* 2. The key exists due to other interests registered but not the one we are adding
* If the key exists, the key for SelectableChannel has to be valid.
* Otherwise, InvalidValid Exception would throw.
*
* @param channel The Selectable to check
* @param operation The operation interest to check
*/
private boolean isInterestRegistered(SelectableChannel channel, int operation) {
SelectionKey key = channel.keyFor(selector);
return key != null && (key.interestOps() & operation) != 0;
}
}