| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| package org.apache.qpid.proton.reactor.impl; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.nio.channels.Channel; |
| import java.nio.channels.SocketChannel; |
| import java.util.Iterator; |
| |
| import org.apache.qpid.proton.Proton; |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.transport.ErrorCondition; |
| import org.apache.qpid.proton.engine.BaseHandler; |
| import org.apache.qpid.proton.engine.Connection; |
| import org.apache.qpid.proton.engine.EndpointState; |
| import org.apache.qpid.proton.engine.Event; |
| import org.apache.qpid.proton.engine.Sasl; |
| import org.apache.qpid.proton.engine.Transport; |
| import org.apache.qpid.proton.engine.impl.TransportImpl; |
| import org.apache.qpid.proton.engine.Record; |
| import org.apache.qpid.proton.reactor.Reactor; |
| import org.apache.qpid.proton.reactor.Selectable; |
| import org.apache.qpid.proton.reactor.Selectable.Callback; |
| import org.apache.qpid.proton.reactor.Selector; |
| import org.apache.qpid.proton.reactor.Acceptor; |
| import org.apache.qpid.proton.reactor.impl.AcceptorImpl; |
| |
| public class IOHandler extends BaseHandler { |
| |
| // pni_handle_quiesced from connection.c |
| private void handleQuiesced(Reactor reactor, Selector selector) throws IOException { |
| // check if we are still quiesced, other handlers of |
| // PN_REACTOR_QUIESCED could have produced more events to process |
| if (!reactor.quiesced()) return; |
| selector.select(reactor.getTimeout()); |
| reactor.mark(); |
| Iterator<Selectable> selectables = selector.readable(); |
| while(selectables.hasNext()) { |
| selectables.next().readable(); |
| } |
| selectables = selector.writeable(); |
| while(selectables.hasNext()) { |
| selectables.next().writeable(); |
| } |
| selectables = selector.expired(); |
| while(selectables.hasNext()) { |
| selectables.next().expired(); |
| } |
| selectables = selector.error(); |
| while(selectables.hasNext()) { |
| selectables.next().error(); |
| } |
| reactor.yield(); |
| } |
| |
| // pni_handle_open(...) from connection.c |
| private void handleOpen(Reactor reactor, Event event) { |
| Connection connection = event.getConnection(); |
| if (connection.getRemoteState() != EndpointState.UNINITIALIZED) { |
| return; |
| } |
| // Outgoing Reactor connections set the virtual host automatically using the |
| // following rules: |
| String vhost = connection.getHostname(); |
| if (vhost == null) { |
| // setHostname never called, use the host from the connection's |
| // socket address as the default virtual host: |
| String conAddr = reactor.getConnectionAddress(connection); |
| if (conAddr != null) { |
| Address addr = new Address(conAddr); |
| connection.setHostname(addr.getHost()); |
| } |
| } else if (vhost.isEmpty()) { |
| // setHostname called explictly with a null string. This allows |
| // the application to completely avoid sending a virtual host |
| // name |
| connection.setHostname(null); |
| } else { |
| // setHostname set by application - use it. |
| } |
| Transport transport = Proton.transport(); |
| |
| int maxFrameSizeOption = reactor.getOptions().getMaxFrameSize(); |
| if (maxFrameSizeOption != 0) { |
| transport.setMaxFrameSize(maxFrameSizeOption); |
| } |
| |
| if (reactor.getOptions().isEnableSaslByDefault()) { |
| Sasl sasl = transport.sasl(); |
| sasl.client(); |
| sasl.setMechanisms("ANONYMOUS"); |
| } |
| |
| transport.bind(connection); |
| } |
| |
| // pni_handle_bound(...) from connection.c |
| // If this connection is an outgoing connection - not an incoming |
| // connection created by the Acceptor - create a socket connection to |
| // the peer address. |
| private void handleBound(Reactor reactor, Event event) { |
| Connection connection = event.getConnection(); |
| Record conn_recs = connection.attachments(); |
| if (conn_recs.get(AcceptorImpl.CONNECTION_ACCEPTOR_KEY, Acceptor.class) != null) { |
| // Connection was created via the Acceptor, so the socket already |
| // exists |
| return; |
| } |
| String url = reactor.getConnectionAddress(connection); |
| String hostname = connection.getHostname(); |
| int port = 5672; |
| |
| if (url != null) { |
| Address address = new Address(url); |
| hostname = address.getHost(); |
| try { |
| port = Integer.parseInt(address.getImpliedPort()); |
| } catch(NumberFormatException nfe) { |
| throw new IllegalArgumentException("Not a valid host: " + url, nfe); |
| } |
| } else if (hostname != null && !hostname.equals("")) { |
| // Backward compatibility with old code that illegally overloaded |
| // the connection's hostname |
| int colonIndex = hostname.indexOf(':'); |
| if (colonIndex >= 0) { |
| try { |
| port = Integer.parseInt(hostname.substring(colonIndex+1)); |
| } catch(NumberFormatException nfe) { |
| throw new IllegalArgumentException("Not a valid host: " + hostname, nfe); |
| } |
| hostname = hostname.substring(0, colonIndex); |
| } |
| } else { |
| throw new IllegalStateException("No address provided for Connection"); |
| } |
| |
| Transport transport = event.getConnection().getTransport(); |
| Socket socket = null; // In this case, 'null' is the proton-j equivalent of PN_INVALID_SOCKET |
| try { |
| SocketChannel socketChannel = ((ReactorImpl)reactor).getIO().socketChannel(); |
| socketChannel.configureBlocking(false); |
| socketChannel.connect(new InetSocketAddress(hostname, port)); |
| socket = socketChannel.socket(); |
| } catch(Exception exception) { |
| ErrorCondition condition = new ErrorCondition(); |
| condition.setCondition(Symbol.getSymbol("proton:io")); |
| condition.setDescription(exception.getMessage()); |
| transport.setCondition(condition); |
| transport.close_tail(); |
| transport.close_head(); |
| transport.pop(Math.max(0, transport.pending())); // Force generation of TRANSPORT_HEAD_CLOSE (not in C code) |
| } |
| selectableTransport(reactor, socket, transport); |
| } |
| |
| // pni_connection_capacity from connection.c |
| private static int capacity(Selectable selectable) { |
| Transport transport = ((SelectableImpl)selectable).getTransport(); |
| int capacity = transport.capacity(); |
| if (capacity < 0) { |
| if (transport.isClosed()) { |
| selectable.terminate(); |
| } |
| } |
| return capacity; |
| } |
| |
| // pni_connection_pending from connection.c |
| private static int pending(Selectable selectable) { |
| Transport transport = ((SelectableImpl)selectable).getTransport(); |
| int pending = transport.pending(); |
| if (pending < 0) { |
| if (transport.isClosed()) { |
| selectable.terminate(); |
| } |
| } |
| return pending; |
| } |
| |
| // pni_connection_deadline from connection.c |
| private static long deadline(SelectableImpl selectable) { |
| Reactor reactor = selectable.getReactor(); |
| Transport transport = selectable.getTransport(); |
| long deadline = transport.tick(reactor.now()); |
| return deadline; |
| } |
| |
| // pni_connection_update from connection.c |
| private static void update(Selectable selectable) { |
| SelectableImpl selectableImpl = (SelectableImpl)selectable; |
| int c = capacity(selectableImpl); |
| int p = pending(selectableImpl); |
| selectable.setReading(c > 0); |
| selectable.setWriting(p > 0); |
| selectable.setDeadline(deadline(selectableImpl)); |
| } |
| |
| // pni_connection_readable from connection.c |
| private static Callback connectionReadable = new Callback() { |
| @Override |
| public void run(Selectable selectable) { |
| Reactor reactor = selectable.getReactor(); |
| Transport transport = ((SelectableImpl)selectable).getTransport(); |
| int capacity = transport.capacity(); |
| if (capacity > 0) { |
| SocketChannel socketChannel = (SocketChannel)selectable.getChannel(); |
| try { |
| int n = socketChannel.read(transport.tail()); |
| if (n == -1) { |
| transport.close_tail(); |
| } else { |
| transport.process(); |
| } |
| } catch (IOException e) { |
| ErrorCondition condition = new ErrorCondition(); |
| condition.setCondition(Symbol.getSymbol("proton:io")); |
| condition.setDescription(e.getMessage()); |
| transport.setCondition(condition); |
| transport.close_tail(); |
| } |
| } |
| // (Comment from C code:) occasionally transport events aren't |
| // generated when expected, so the following hack ensures we |
| // always update the selector |
| update(selectable); |
| reactor.update(selectable); |
| } |
| }; |
| |
| // pni_connection_writable from connection.c |
| private static Callback connectionWritable = new Callback() { |
| @Override |
| public void run(Selectable selectable) { |
| Reactor reactor = selectable.getReactor(); |
| Transport transport = ((SelectableImpl)selectable).getTransport(); |
| int pending = transport.pending(); |
| if (pending > 0) { |
| SocketChannel channel = (SocketChannel)selectable.getChannel(); |
| try { |
| int n = channel.write(transport.head()); |
| if (n < 0) { |
| transport.close_head(); |
| } else { |
| transport.pop(n); |
| } |
| } catch(IOException ioException) { |
| ErrorCondition condition = new ErrorCondition(); |
| condition.setCondition(Symbol.getSymbol("proton:io")); |
| condition.setDescription(ioException.getMessage()); |
| transport.setCondition(condition); |
| transport.close_head(); |
| } |
| } |
| |
| int newPending = transport.pending(); |
| if (newPending != pending) { |
| update(selectable); |
| reactor.update(selectable); |
| } |
| } |
| }; |
| |
| // pni_connection_error from connection.c |
| private static Callback connectionError = new Callback() { |
| @Override |
| public void run(Selectable selectable) { |
| Reactor reactor = selectable.getReactor(); |
| selectable.terminate(); |
| reactor.update(selectable); |
| } |
| }; |
| |
| // pni_connection_expired from connection.c |
| private static Callback connectionExpired = new Callback() { |
| @Override |
| public void run(Selectable selectable) { |
| Reactor reactor = selectable.getReactor(); |
| Transport transport = ((SelectableImpl)selectable).getTransport(); |
| long deadline = transport.tick(reactor.now()); |
| selectable.setDeadline(deadline); |
| int c = capacity(selectable); |
| int p = pending(selectable); |
| selectable.setReading(c > 0); |
| selectable.setWriting(p > 0); |
| reactor.update(selectable); |
| } |
| }; |
| |
| private static Callback connectionFree = new Callback() { |
| @Override |
| public void run(Selectable selectable) { |
| Channel channel = selectable.getChannel(); |
| if (channel != null) { |
| try { |
| channel.close(); |
| } catch(IOException ioException) { |
| // Ignore |
| } |
| } |
| } |
| }; |
| |
| // pn_reactor_selectable_transport |
| // Note the socket argument can, validly be 'null' this is the equivalent of proton-c's PN_INVALID_SOCKET |
| protected static Selectable selectableTransport(Reactor reactor, Socket socket, Transport transport) { |
| Selectable selectable = reactor.selectable(); |
| selectable.setChannel(socket != null ? socket.getChannel() : null); |
| selectable.onReadable(connectionReadable); |
| selectable.onWritable(connectionWritable); |
| selectable.onError(connectionError); |
| selectable.onExpired(connectionExpired); |
| selectable.onFree(connectionFree); |
| ((SelectableImpl)selectable).setTransport(transport); |
| ((TransportImpl)transport).setSelectable(selectable); |
| ((TransportImpl)transport).setReactor(reactor); |
| update(selectable); |
| reactor.update(selectable); |
| return selectable; |
| } |
| |
| private void handleTransport(Reactor reactor, Event event) { |
| TransportImpl transport = (TransportImpl)event.getTransport(); |
| Selectable selectable = transport.getSelectable(); |
| if (selectable != null && !selectable.isTerminal()) { |
| update(selectable); |
| reactor.update(selectable); |
| } |
| } |
| |
| @Override |
| public void onUnhandled(Event event) { |
| try { |
| ReactorImpl reactor = (ReactorImpl)event.getReactor(); |
| Selector selector = reactor.getSelector(); |
| if (selector == null) { |
| selector = new SelectorImpl(reactor.getIO()); |
| reactor.setSelector(selector); |
| } |
| |
| Selectable selectable; |
| switch(event.getType()) { |
| case SELECTABLE_INIT: |
| selectable = event.getSelectable(); |
| selector.add(selectable); |
| break; |
| case SELECTABLE_UPDATED: |
| selectable = event.getSelectable(); |
| selector.update(selectable); |
| break; |
| case SELECTABLE_FINAL: |
| selectable = event.getSelectable(); |
| selector.remove(selectable); |
| selectable.release(); |
| break; |
| case CONNECTION_LOCAL_OPEN: |
| handleOpen(reactor, event); |
| break; |
| case CONNECTION_BOUND: |
| handleBound(reactor, event); |
| break; |
| case TRANSPORT: |
| handleTransport(reactor, event); |
| break; |
| case TRANSPORT_CLOSED: |
| event.getTransport().unbind(); |
| break; |
| case REACTOR_QUIESCED: |
| handleQuiesced(reactor, selector); |
| break; |
| default: |
| break; |
| } |
| } catch(IOException ioException) { |
| // XXX: Might not be the right exception type, but at least the exception isn't being swallowed |
| throw new ReactorInternalException(ioException); |
| } |
| } |
| } |