| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| package org.apache.qpid.proton.reactor.impl; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.ClosedChannelException; |
| import java.nio.channels.Pipe; |
| import java.util.HashSet; |
| import java.util.Set; |
| |
| import org.apache.qpid.proton.Proton; |
| import org.apache.qpid.proton.engine.BaseHandler; |
| import org.apache.qpid.proton.engine.Collector; |
| import org.apache.qpid.proton.engine.Connection; |
| import org.apache.qpid.proton.engine.Event; |
| import org.apache.qpid.proton.engine.Event.Type; |
| import org.apache.qpid.proton.engine.EventType; |
| import org.apache.qpid.proton.engine.Extendable; |
| import org.apache.qpid.proton.engine.ExtendableAccessor; |
| import org.apache.qpid.proton.engine.Handler; |
| import org.apache.qpid.proton.engine.HandlerException; |
| import org.apache.qpid.proton.engine.Record; |
| import org.apache.qpid.proton.engine.impl.CollectorImpl; |
| import org.apache.qpid.proton.engine.impl.ConnectionImpl; |
| import org.apache.qpid.proton.engine.impl.RecordImpl; |
| import org.apache.qpid.proton.reactor.Acceptor; |
| import org.apache.qpid.proton.reactor.impl.AcceptorImpl; |
| import org.apache.qpid.proton.reactor.Reactor; |
| import org.apache.qpid.proton.reactor.ReactorChild; |
| import org.apache.qpid.proton.reactor.Selectable; |
| import org.apache.qpid.proton.reactor.Selectable.Callback; |
| import org.apache.qpid.proton.reactor.Selector; |
| import org.apache.qpid.proton.reactor.Task; |
| |
| @SuppressWarnings("deprecation") |
| public class ReactorImpl implements Reactor, Extendable { |
| public static final ExtendableAccessor<Event, Handler> ROOT = new ExtendableAccessor<>(Handler.class); |
| |
| private CollectorImpl collector; |
| private long now; |
| private long timeout; |
| private Handler global; |
| private Handler handler; |
| private Set<ReactorChild> children; |
| private int selectables; |
| private boolean yield; |
| private boolean stop; |
| private Selectable selectable; |
| private EventType previous; |
| private Timer timer; |
| private final Pipe wakeup; |
| private Selector selector; |
| private Record attachments; |
| private final IO io; |
| protected static final String CONNECTION_PEER_ADDRESS_KEY = "pn_reactor_connection_peer_address"; |
| |
| @Override |
| public long mark() { |
| now = System.currentTimeMillis(); |
| return now; |
| } |
| |
| @Override |
| public long now() { |
| return now; |
| } |
| |
| protected ReactorImpl(IO io) throws IOException { |
| collector = (CollectorImpl)Proton.collector(); |
| global = new IOHandler(); |
| handler = new BaseHandler(); |
| children = new HashSet<ReactorChild>(); |
| selectables = 0; |
| timer = new Timer(collector); |
| this.io = io; |
| wakeup = this.io.pipe(); |
| mark(); |
| attachments = new RecordImpl(); |
| } |
| |
| public ReactorImpl() throws IOException { |
| this(new IOImpl()); |
| } |
| |
| @Override |
| public void free() { |
| if (wakeup.source().isOpen()) { |
| try { |
| wakeup.source().close(); |
| } catch(IOException e) { |
| // Ignore. |
| } |
| } |
| if (wakeup.sink().isOpen()) { |
| try { |
| wakeup.sink().close(); |
| } catch(IOException e) { |
| // Ignore |
| } |
| } |
| |
| if (selector != null) { |
| selector.free(); |
| } |
| |
| for (ReactorChild child : children) { |
| child.free(); |
| } |
| } |
| |
| @Override |
| public Record attachments() { |
| return attachments; |
| } |
| |
| @Override |
| public long getTimeout() { |
| return timeout; |
| } |
| |
| @Override |
| public void setTimeout(long timeout) { |
| this.timeout = timeout; |
| } |
| |
| @Override |
| public Handler getGlobalHandler() { |
| return global; |
| } |
| |
| @Override |
| public void setGlobalHandler(Handler handler) { |
| global = handler; |
| } |
| |
| @Override |
| public Handler getHandler() { |
| return handler; |
| } |
| |
| @Override |
| public void setHandler(Handler handler) { |
| this.handler = handler; |
| } |
| |
| @Override |
| public Set<ReactorChild> children() { |
| return children; |
| } |
| |
| @Override |
| public Collector collector() { |
| return collector; |
| } |
| |
| private class ReleaseCallback implements Callback { |
| private final ReactorImpl reactor; |
| private final ReactorChild child; |
| public ReleaseCallback(ReactorImpl reactor, ReactorChild child) { |
| this.reactor = reactor; |
| this.child = child; |
| } |
| @Override |
| public void run(Selectable selectable) { |
| if (reactor.children.remove(child)) { |
| --reactor.selectables; |
| child.free(); |
| } |
| } |
| } |
| |
| @Override |
| public Selectable selectable() { |
| return selectable(null); |
| } |
| |
| public SelectableImpl selectable(ReactorChild child) { |
| SelectableImpl result = new SelectableImpl(); |
| result.setCollector(collector); |
| collector.put(Type.SELECTABLE_INIT, result); |
| result.setReactor(this); |
| children.add(child == null ? result : child); |
| result.onRelease(new ReleaseCallback(this, child == null ? result : child)); |
| ++selectables; |
| return result; |
| } |
| |
| @Override |
| public void update(Selectable selectable) { |
| SelectableImpl selectableImpl = (SelectableImpl)selectable; |
| if (!selectableImpl.isTerminated()) { |
| if (selectableImpl.isTerminal()) { |
| selectableImpl.terminated(); |
| collector.put(Type.SELECTABLE_FINAL, selectable); |
| } else { |
| collector.put(Type.SELECTABLE_UPDATED, selectable); |
| } |
| } |
| } |
| |
| // pn_event_handler |
| private Handler eventHandler(Event event) { |
| Handler result; |
| if (event.getLink() != null) { |
| result = BaseHandler.getHandler(event.getLink()); |
| if (result != null) return result; |
| } |
| if (event.getSession() != null) { |
| result = BaseHandler.getHandler(event.getSession()); |
| if (result != null) return result; |
| } |
| if (event.getConnection() != null) { |
| result = BaseHandler.getHandler(event.getConnection()); |
| if (result != null) return result; |
| } |
| |
| if (event.getTask() != null) { |
| result = BaseHandler.getHandler(event.getTask()); |
| if (result != null) return result; |
| } |
| |
| if (event.getSelectable() != null) { |
| result = BaseHandler.getHandler(event.getSelectable()); |
| if (result != null) return result; |
| } |
| |
| return handler; |
| } |
| |
| |
| @Override |
| public void yield() { |
| yield = true; |
| } |
| |
| @Override |
| public boolean quiesced() { |
| Event event = collector.peek(); |
| if (event == null) return true; |
| if (collector.more()) return false; |
| return event.getEventType() == Type.REACTOR_QUIESCED; |
| } |
| |
| @Override |
| public boolean process() throws HandlerException { |
| mark(); |
| EventType previous = null; |
| while (true) { |
| Event event = collector.peek(); |
| if (event != null) { |
| if (yield) { |
| yield = false; |
| return true; |
| } |
| Handler handler = eventHandler(event); |
| dispatch(event, handler); |
| dispatch(event, global); |
| |
| if (event.getEventType() == Type.CONNECTION_FINAL) { |
| children.remove(event.getConnection()); |
| } |
| this.previous = event.getEventType(); |
| previous = this.previous; |
| collector.pop(); |
| |
| } else { |
| if (!stop && more()) { |
| if (previous != Type.REACTOR_QUIESCED && this.previous != Type.REACTOR_FINAL) { |
| collector.put(Type.REACTOR_QUIESCED, this); |
| } else { |
| return true; |
| } |
| } else { |
| if (selectable != null) { |
| selectable.terminate(); |
| update(selectable); |
| selectable = null; |
| } else { |
| collector.put(Type.REACTOR_FINAL, this); |
| return false; |
| } |
| } |
| } |
| } |
| } |
| |
| private void dispatch(Event event, Handler handler) { |
| ROOT.set(event, handler); |
| event.dispatch(handler); |
| } |
| |
| @Override |
| public void wakeup() { |
| try { |
| wakeup.sink().write(ByteBuffer.allocate(1)); |
| } catch(ClosedChannelException channelClosedException) { |
| // Ignore - pipe already closed by reactor being shutdown. |
| } catch(IOException ioException) { |
| throw new ReactorInternalException(ioException); |
| } |
| } |
| |
| @Override |
| public void start() { |
| collector.put(Type.REACTOR_INIT, this); |
| selectable = timerSelectable(); |
| } |
| |
| @Override |
| public void stop() throws HandlerException { |
| stop = true; |
| } |
| |
| private boolean more() { |
| return timer.tasks() > 0 || selectables > 1; |
| } |
| |
| @Override |
| public void run() throws HandlerException { |
| setTimeout(3141); |
| start(); |
| while(process()) {} |
| stop(); |
| process(); |
| collector = null; |
| } |
| |
| // pn_reactor_schedule from reactor.c |
| @Override |
| public Task schedule(int delay, Handler handler) { |
| Task task = timer.schedule(now + delay); |
| ((TaskImpl)task).setReactor(this); |
| BaseHandler.setHandler(task, handler); |
| if (selectable != null) { |
| selectable.setDeadline(timer.deadline()); |
| update(selectable); |
| } |
| return task; |
| } |
| |
| private void expireSelectable(Selectable selectable) { |
| ReactorImpl reactor = (ReactorImpl) selectable.getReactor(); |
| reactor.timer.tick(reactor.now); |
| selectable.setDeadline(reactor.timer.deadline()); |
| reactor.update(selectable); |
| } |
| |
| private class TimerReadable implements Callback { |
| |
| @Override |
| public void run(Selectable selectable) { |
| try { |
| wakeup.source().read(ByteBuffer.allocate(64)); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| expireSelectable(selectable); |
| } |
| |
| } |
| |
| private class TimerExpired implements Callback { |
| @Override |
| public void run(Selectable selectable) { |
| expireSelectable(selectable); |
| } |
| } |
| |
| |
| // pni_timer_finalize from reactor.c |
| private static class TimerFree implements Callback { |
| @Override |
| public void run(Selectable selectable) { |
| try { |
| selectable.getChannel().close(); |
| } catch(IOException ioException) { |
| // Ignore |
| } |
| } |
| } |
| |
| private Selectable timerSelectable() { |
| Selectable sel = selectable(); |
| sel.setChannel(wakeup.source()); |
| sel.onReadable(new TimerReadable()); |
| sel.onExpired(new TimerExpired()); |
| sel.onFree(new TimerFree()); |
| sel.setReading(true); |
| sel.setDeadline(timer.deadline()); |
| update(sel); |
| return sel; |
| } |
| |
| protected Selector getSelector() { |
| return selector; |
| } |
| |
| protected void setSelector(Selector selector) { |
| this.selector = selector; |
| } |
| |
| // pn_reactor_connection from connection.c |
| @Override |
| public Connection connection(Handler handler) { |
| Connection connection = Proton.connection(); |
| BaseHandler.setHandler(connection, handler); |
| connection.collect(collector); |
| children.add(connection); |
| ((ConnectionImpl)connection).setReactor(this); |
| return connection; |
| } |
| |
| @Override |
| public Connection connectionToHost(String host, int port, Handler handler) { |
| Connection connection = connection(handler); |
| setConnectionHost(connection, host, port); |
| return connection; |
| } |
| |
| @Override |
| public String getConnectionAddress(Connection connection) { |
| Record r = connection.attachments(); |
| Address addr = r.get(CONNECTION_PEER_ADDRESS_KEY, Address.class); |
| if (addr != null) { |
| StringBuilder sb = new StringBuilder(addr.getHost()); |
| if (addr.getPort() != null) |
| sb.append(":" + addr.getPort()); |
| return sb.toString(); |
| } |
| return null; |
| } |
| |
| @Override |
| public void setConnectionHost(Connection connection, |
| String host, int port) { |
| Record r = connection.attachments(); |
| // cannot set the address on an incoming connection |
| if (r.get(AcceptorImpl.CONNECTION_ACCEPTOR_KEY, Acceptor.class) == null) { |
| Address addr = new Address(); |
| addr.setHost(host); |
| if (port == 0) { |
| port = 5672; |
| } |
| addr.setPort(Integer.toString(port)); |
| r.set(CONNECTION_PEER_ADDRESS_KEY, Address.class, addr); |
| } else { |
| throw new IllegalStateException("Cannot set the host address on an incoming Connection"); |
| } |
| } |
| |
| @Override |
| public Acceptor acceptor(String host, int port) throws IOException { |
| return this.acceptor(host, port, null); |
| } |
| |
| @Override |
| public Acceptor acceptor(String host, int port, Handler handler) throws IOException { |
| return new AcceptorImpl(this, host, port, handler); |
| } |
| |
| public IO getIO() { |
| return io; |
| } |
| } |