blob: b4efb39039cf8910a97fc0789cadcd0bea3a7ffd [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.proton.reactor.impl;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.reactor.Selectable;
import org.apache.qpid.proton.reactor.Selector;
class SelectorImpl implements Selector {
private final java.nio.channels.Selector selector;
private final HashSet<Selectable> selectables = new HashSet<Selectable>();
private final HashSet<Selectable> readable = new HashSet<Selectable>();
private final HashSet<Selectable> writeable = new HashSet<Selectable>();
private final HashSet<Selectable> expired = new HashSet<Selectable>();
private final HashSet<Selectable> error = new HashSet<Selectable>();
protected SelectorImpl(IO io) throws IOException {
selector = io.selector();
}
@Override
public void add(Selectable selectable) throws IOException {
// Selectable can be 'null' - if this is the case it can only ever receive expiry events.
if (selectable.getChannel() != null) {
selectable.getChannel().configureBlocking(false);
SelectionKey key = selectable.getChannel().register(selector, 0);
key.attach(selectable);
}
selectables.add(selectable);
update(selectable);
}
@Override
public void update(Selectable selectable) {
if (selectable.getChannel() != null) {
int interestedOps = 0;
if (selectable.getChannel() instanceof SocketChannel &&
((SocketChannel)selectable.getChannel()).isConnectionPending()) {
interestedOps |= SelectionKey.OP_CONNECT;
} else {
if (selectable.isReading()) {
if (selectable.getChannel() instanceof ServerSocketChannel) {
interestedOps |= SelectionKey.OP_ACCEPT;
} else {
interestedOps |= SelectionKey.OP_READ;
}
}
if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE;
}
SelectionKey key = selectable.getChannel().keyFor(selector);
key.interestOps(interestedOps);
}
}
@Override
public void remove(Selectable selectable) {
if (selectable.getChannel() != null) {
SelectionKey key = selectable.getChannel().keyFor(selector);
if (key != null) {
key.cancel();
key.attach(null);
}
}
selectables.remove(selectable);
}
@Override
public void select(long timeout) throws IOException {
long now = System.currentTimeMillis();
if (timeout > 0) {
long deadline = 0;
// XXX: Note: this differs from the C code which requires a call to update() to make deadline changes take affect
for (Selectable selectable : selectables) {
long d = selectable.getDeadline();
if (d > 0) {
deadline = (deadline == 0) ? d : Math.min(deadline, d);
}
}
if (deadline > 0) {
long delta = deadline - now;
if (delta < 0) {
timeout = 0;
} else if (delta < timeout) {
timeout = delta;
}
}
}
error.clear();
long awoken = 0;
if (timeout > 0) {
long remainingTimeout = timeout;
while(remainingTimeout > 0) {
selector.select(remainingTimeout);
awoken = System.currentTimeMillis();
for (Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); iterator.hasNext();) {
SelectionKey key = iterator.next();
if (key.isConnectable()) {
try {
((SocketChannel)key.channel()).finishConnect();
update((Selectable)key.attachment());
} catch(IOException ioException) {
SelectableImpl selectable = (SelectableImpl)key.attachment();
ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.getSymbol("proton:io"));
condition.setDescription(ioException.getMessage());
Transport transport = selectable.getTransport();
if (transport != null) {
transport.setCondition(condition);
transport.close_tail();
transport.close_head();
transport.pop(Math.max(0, transport.pending())); // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
}
error.add(selectable);
}
iterator.remove();
}
}
if (!selector.selectedKeys().isEmpty()) {
break;
}
remainingTimeout = remainingTimeout - (awoken - now);
}
} else {
selector.selectNow();
awoken = System.currentTimeMillis();
}
readable.clear();
writeable.clear();
expired.clear();
for (SelectionKey key : selector.selectedKeys()) {
Selectable selectable = (Selectable)key.attachment();
if (key.isReadable()) readable.add(selectable);
if (key.isAcceptable()) readable.add(selectable);
if (key.isWritable()) writeable.add(selectable);
}
selector.selectedKeys().clear();
// XXX: Note: this is different to the C code which evaluates expiry at the point the selectable is iterated over.
for (Selectable selectable : selectables) {
long deadline = selectable.getDeadline();
if (deadline > 0 && awoken >= deadline) {
expired.add(selectable);
}
}
}
@Override
public Iterator<Selectable> readable() {
return readable.iterator();
}
@Override
public Iterator<Selectable> writeable() {
return writeable.iterator();
}
@Override
public Iterator<Selectable> expired() {
return expired.iterator();
}
@Override
public Iterator<Selectable> error() {
return error.iterator();
}
@Override
public void free() {
try {
selector.close();
} catch(IOException ioException) {
// Ignore
}
}
}