| // Copyright 2016 Twitter. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.twitter.heron.common.basics; |
| |
| import java.io.IOException; |
| import java.nio.channels.ClosedChannelException; |
| import java.nio.channels.SelectableChannel; |
| import java.nio.channels.SelectionKey; |
| import java.nio.channels.Selector; |
| import java.time.Duration; |
| import java.util.Iterator; |
| import java.util.Set; |
| |
| /** |
| * A NIOLooper, implementing WakeableLooper, is a class wrapping a Java NIO selector to dispatch events. |
| * It extends WakeableLooper, so it will execute in a while loop unless the exitLoop() is called. |
| * And in every execution, in tasksOnWakeup(), it will handle the selected keys. |
| * The NIOLooper should start by calling {@code loop()} |
| */ |
| |
| public class NIOLooper extends WakeableLooper { |
| private final Selector selector; |
| |
| public NIOLooper() throws IOException { |
| selector = Selector.open(); |
| |
| addNIOLooperTasks(); |
| } |
| |
| private void addNIOLooperTasks() { |
| Runnable task = new Runnable() { |
| @Override |
| public void run() { |
| handleSelectedKeys(); |
| } |
| }; |
| addTasksOnWakeup(task); |
| } |
| |
| @Override |
| public void doWait() { |
| // If timer task exists, the doWait() should wait not later than the time timer to execute |
| // It no timer exists, we consider it will wait forever until other threads call wakeUp() |
| Duration nextTimeoutInterval = getNextTimeoutInterval(); |
| |
| // doWait(timeout), which in fact is implemented by selector.select(timeout), and it will |
| // wake up, if other threads wake it up, it meets the timeout, one channel is selected, or |
| // the current thread is interrupted. |
| try { |
| if (nextTimeoutInterval.toMillis() > 0) { |
| // The select will take the timeout in unit of milli-seconds |
| selector.select(nextTimeoutInterval.toMillis()); |
| } else { |
| selector.selectNow(); |
| } |
| } catch (IOException e) { |
| e.printStackTrace(); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void wakeUp() { |
| selector.wakeup(); |
| } |
| |
| public void removeAllInterest(SelectableChannel channel) { |
| SelectionKey key = channel.keyFor(selector); |
| if (key != null) { |
| key.cancel(); |
| } |
| } |
| |
| // Handle the selected keys |
| private void handleSelectedKeys() { |
| Set<SelectionKey> selectedKeys = selector.selectedKeys(); |
| Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); |
| while (keyIterator.hasNext()) { |
| SelectionKey key = keyIterator.next(); |
| keyIterator.remove(); |
| |
| ISelectHandler callback = (ISelectHandler) key.attachment(); |
| |
| if (!key.isValid()) { |
| // This method key.channel() will continue to return the channel even after the |
| // key is cancelled. |
| callback.handleError(key.channel()); |
| continue; |
| } |
| |
| // We need to check whether the key is still valid since: |
| // 1. The key could be cancelled by last operation |
| // 2. The process might not fail-fast or throw exceptions after the key is cancelled |
| if (key.isValid() && key.isWritable()) { |
| callback.handleWrite(key.channel()); |
| } |
| |
| if (key.isValid() && key.isReadable()) { |
| callback.handleRead(key.channel()); |
| } |
| |
| if (key.isValid() && key.isConnectable()) { |
| callback.handleConnect(key.channel()); |
| } |
| |
| if (key.isValid() && key.isAcceptable()) { |
| callback.handleAccept(key.channel()); |
| } |
| |
| } |
| } |
| |
| public boolean isChannelValid(SelectableChannel channel) { |
| SelectionKey key = channel.keyFor(selector); |
| return key != null && key.isValid(); |
| } |
| |
| /** |
| * Followings are the register, unregister, isRegister for different operations for the selector and channel |
| */ |
| public void registerRead(SelectableChannel channel, ISelectHandler callback) |
| throws ClosedChannelException { |
| assert channel.keyFor(selector) == null |
| || (channel.keyFor(selector).interestOps() & SelectionKey.OP_CONNECT) == 0; |
| addInterest(channel, SelectionKey.OP_READ, callback); |
| } |
| |
| public void unregisterRead(SelectableChannel channel) { |
| removeInterest(channel, SelectionKey.OP_READ); |
| } |
| |
| public boolean isReadRegistered(SelectableChannel channel) { |
| return isInterestRegistered(channel, SelectionKey.OP_READ); |
| } |
| |
| public void registerConnect(SelectableChannel channel, ISelectHandler callback) |
| throws ClosedChannelException { |
| // This channel should be first use |
| assert channel.keyFor(selector) == null; |
| addInterest(channel, SelectionKey.OP_CONNECT, callback); |
| } |
| |
| public void unregisterConnect(SelectableChannel channel) { |
| removeInterest(channel, SelectionKey.OP_CONNECT); |
| } |
| |
| public boolean isConnectRegistered(SelectableChannel channel) { |
| return isInterestRegistered(channel, SelectionKey.OP_CONNECT); |
| } |
| |
| public void registerAccept(SelectableChannel channel, ISelectHandler callback) |
| throws ClosedChannelException { |
| addInterest(channel, SelectionKey.OP_ACCEPT, callback); |
| } |
| |
| public void unregisterAccept(SelectableChannel channel) { |
| removeInterest(channel, SelectionKey.OP_ACCEPT); |
| } |
| |
| public boolean isAcceptRegistered(SelectableChannel channel) { |
| return isInterestRegistered(channel, SelectionKey.OP_ACCEPT); |
| } |
| |
| public void registerWrite(SelectableChannel channel, ISelectHandler callback) |
| throws ClosedChannelException { |
| addInterest(channel, SelectionKey.OP_WRITE, callback); |
| } |
| |
| public void unregisterWrite(SelectableChannel channel) { |
| removeInterest(channel, SelectionKey.OP_WRITE); |
| } |
| |
| public boolean isWriteRegistered(SelectableChannel channel) { |
| return isInterestRegistered(channel, SelectionKey.OP_WRITE); |
| } |
| |
| /** |
| * Register an operation interest on a SelectableChannel, with ISelectHandler as callback attachment |
| * There are two cases when trying to register an interest |
| * 1. The whole key does not exist; no interests ever registered for this channel |
| * 2. The key exists due to other interests registered but not the one we are adding |
| * <p> |
| * In 1st case, we just register this channel with operation on the given Selector |
| * In 2nd case, we have to make sure the state of NIOLooper is clean: |
| * 1. Key has to be valid |
| * 2. The interest has not yet been registered |
| * 3. If old attached ISelectHandler exists, it has to be the same as new one |
| * If any one of above 3 conditions are not met, RuntimeException would be thrown. |
| * |
| * @param channel The Selectable to register operation interest |
| * @param operation The operation interest to register |
| * @param callback The Callback to handle |
| * @throws ClosedChannelException if Channel is closed when trying to register an interest |
| */ |
| private void addInterest(SelectableChannel channel, |
| int operation, |
| ISelectHandler callback) |
| throws ClosedChannelException { |
| |
| SelectionKey key = channel.keyFor(selector); |
| |
| if (key == null) { |
| channel.register(selector, operation, callback); |
| } else if (!key.isValid()) { |
| throw new RuntimeException( |
| String.format("Unable to add %d in %s due to key is invalid", operation, channel)); |
| } else { |
| // Key is not null and key is valid |
| if ((key.interestOps() & operation) != 0) { |
| throw new RuntimeException( |
| String.format("%d has been registered in %s", operation, channel)); |
| } |
| if (key.attachment() == null) { |
| key.attach(callback); |
| } else { |
| if (callback != key.attachment()) { |
| throw new RuntimeException("Unmatched SelectHandler has already been attached" |
| + " for other operation"); |
| } |
| // If call == key.attachment |
| // Just skip |
| } |
| key.interestOps(key.interestOps() | operation); |
| } |
| } |
| |
| /** |
| * Remove one operation interest on a SelectableChannel. |
| * The SelectableChannel has to be registered with Selector ahead. |
| * Otherwise, NullPointerExceptions would throw |
| * The key for SelectableChannel has to be valid. |
| * Otherwise, InvalidValid Exception would throw. |
| * |
| * @param channel the SelectableChannel to remove operation interest |
| * @param operation the interest to remove |
| */ |
| private void removeInterest(SelectableChannel channel, int operation) { |
| SelectionKey key = channel.keyFor(selector); |
| |
| // Exception would be thrown if key is null or key is inValid |
| // We do not need double check it ahead |
| key.interestOps(key.interestOps() & (~operation)); |
| } |
| |
| /** |
| * Check whether an operation interest was registered on a SelectableChannel |
| * There are two cases that interest is not registered |
| * 1. The whole key does not exist; no interests ever registered for this channel |
| * 2. The key exists due to other interests registered but not the one we are adding |
| * If the key exists, the key for SelectableChannel has to be valid. |
| * Otherwise, InvalidValid Exception would throw. |
| * |
| * @param channel The Selectable to check |
| * @param operation The operation interest to check |
| */ |
| private boolean isInterestRegistered(SelectableChannel channel, int operation) { |
| SelectionKey key = channel.keyFor(selector); |
| |
| return key != null && (key.interestOps() & operation) != 0; |
| } |
| } |