blob: 86aeed544b8c01766d1153169a01e3bc7a1b1517 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.proton.reactor;
import java.io.IOException;
import java.util.Set;
import org.apache.qpid.proton.ReactorOptions;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Event.Type;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.HandlerException;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.reactor.impl.ReactorImpl;
/**
* The proton reactor provides a general purpose event processing
* library for writing reactive programs. A reactive program is defined
* by a set of event handlers. An event handler is just any class or
* object that extends the Handler interface. For convenience, a class
* can extend {@link BaseHandler} and only handle the events that it cares to
* implement methods for.
* <p>
* This class is not thread safe (with the exception of the {@link #wakeup()}
* method) and should only be used by a single thread at any given time.
*/
public interface Reactor {
public static final class Factory
{
public static Reactor create() throws IOException {
return new ReactorImpl();
}
public static Reactor create(ReactorOptions options) throws IOException {
return new ReactorImpl(options);
}
}
/**
* Updates the last time that the reactor's state has changed, potentially
* resulting in events being generated.
* @return the current time in milliseconds
* {@link System#currentTimeMillis()}.
*/
long mark();
/** @return the last time that {@link #mark()} was called. */
long now();
/** @return an instance of {@link Record} that can be used to associate
* other objects (attachments) with this instance of the
* Reactor class.
*/
Record attachments();
/**
* The value the reactor will use for {@link Selector#select(long)} that is called as part of {@link #process()}.
*
* @param timeout a timeout value in milliseconds, to associate with this instance of
* the reactor. This can be retrieved using the
* {@link #getTimeout()} method
*/
void setTimeout(long timeout);
/**
* @return the value previously set using {@link #setTimeout(long)} or
* 0 if no previous value has been set.
*/
long getTimeout();
/**
* @return the global handler for this reactor. Every event the reactor
* sees is dispatched to the global handler. To receive every
* event generated by the reactor, associate a child handler
* with the global handler. For example:
* <pre>
* getGlobalHandler().add(yourHandler);
* </pre>
*/
Handler getGlobalHandler();
/**
* Sets a new global handler. You probably don't want to do this and
* would be better adding a handler to the value returned by the
* {{@link #getGlobalHandler()} method.
* @param handler the new global handler.
*/
void setGlobalHandler(Handler handler);
/**
* @return the handler for this reactor. Every event the reactor sees,
* which is not handled by a child of the reactor (such as a
* timer, connection, acceptor, or selector) is passed to this
* handler. To receive these events, it is recommend that you
* associate a child handler with the handler returned by this
* method. For example:
* <pre>
* getHandler().add(yourHandler);
* </pre>
*/
Handler getHandler();
/**
* Sets a new handler, that will receive any events not handled by a child
* of the reactor. Note that setting a handler via this method replaces
* the previous handler, and will result in no further events being
* dispatched to the child handlers associated with the previous handler.
* For this reason it is recommended that you do not use this method and
* instead add child handlers to the value returned by the
* {@link #getHandler()} method.
* @param handler the new handler for this reactor.
*/
void setHandler(Handler handler);
/**
* @return a set containing the child objects associated with this reactor.
* This will contain any active instances of: {@link Task} -
* created using the {@link #schedule(int, Handler)} method,
* {@link Connection} - created using the
* {@link #connectionToHost(String, int, Handler)} method,
* {@link Acceptor} - created using the
* {@link #acceptor(String, int)} method,
* {@link #acceptor(String, int, Handler)} method, or
* {@link Selectable} - created using the
* {@link #selectable()} method.
*/
Set<ReactorChild> children();
/**
* @return the Collector used to gather events generated by this reactor.
*/
Collector collector();
/**
* Creates a new <code>Selectable</code> as a child of this reactor.
* @return the newly created <code>Selectable</code>.
*/
Selectable selectable();
/**
* Updates the specified <code>Selectable</code> either emitting a
* {@link Type#SELECTABLE_UPDATED} event if the selectable is not terminal,
* or {@link Type#SELECTABLE_FINAL} if the selectable is terminal and has
* not already emitted a {@link Type#SELECTABLE_FINAL} event.
* @param selectable
*/
void update(Selectable selectable);
/**
* Yields, causing the next call to {@link #process()} to return
* successfully - without processing any events. If multiple calls
* can be made to <code>yield</code> and only the next invocation of
* {@link #process()} will be affected.
*/
void yield() ;
/**
* @return <code>true</code> if the reactor is in quiesced state (e.g. has
* no events to process). <code>false</code> is returned otherwise.
*/
boolean quiesced();
/**
* Process any events pending for this reactor. Events are dispatched to
* the handlers registered with the reactor, or child objects associated
* with the reactor. This method blocks until the reactor has no more work
* to do (and no more work pending, in terms of scheduled tasks or open
* selectors to process).
* @return <code>true</code> if the reactor may have more events in the
* future. For example: if there are scheduled tasks, or open
* selectors. <code>false</code> is returned if the reactor has
* (and will have) no more events to process.
* @throws HandlerException if an unchecked exception is thrown by one of
* the handlers - it will be re-thrown attached to an instance of
* <code>HandlerException</code>.
*/
boolean process() throws HandlerException;
/**
* Wakes up the thread (if any) blocked in the {@link #process()} method.
* This is the only method of this class that is thread safe, in that it
* can be used at the same time as another thread is using the reactor.
*/
void wakeup();
/**
* Starts the reactor. This method should be invoked before the first call
* to {@link #process()}.
*/
void start();
/**
* Stops the reactor. This method should be invoked after the last call to
* {@link #process()}.
* @throws HandlerException
*/
void stop() throws HandlerException;
/**
* Simplifies the use of the reactor by wrapping the use of
* <code>start</code>, <code>run</code>, and <code>stop</code> method
* calls.
* <p>
* Logically the implementation of this method is:
* <pre>
* start();
* while(process()) {}
* stop();
* </pre>
* @throws HandlerException if an unchecked exception is thrown by one of
* the handlers - it will be re-thrown attached to an instance of
* <code>HandlerException</code>.
*/
void run() throws HandlerException;
/**
* Schedules execution of a task to take place at some point in the future.
* @param delay the number of milliseconds, in the future, to schedule the
* task for.
* @param handler a handler to associate with the task. This is notified
* when the deadline for the task is reached.
* @return an object representing the task that has been scheduled.
*/
Task schedule(int delay, Handler handler);
/**
* Creates a new out-bound connection.
* @param handler a handler that is notified when events occur for the
* connection. Typically the host and port to connect to
* would be supplied to the connection object inside the
* logic which handles the {@link Type#CONNECTION_INIT}
* event via
* {@link #setConnectionHost(Connection, String, int)}
* @return the newly created connection object.
* @deprecated Use {@link #connectionToHost(String, int, Handler)} instead.
*/
@Deprecated
Connection connection(Handler handler);
/**
* Creates a new out-bound connection to the given host and port.
* <p>
* This method will cause Reactor to set up a network connection to the
* host and create a Connection for it.
* @param host the host to connect to (e.g. "localhost")
* @param port the port used for the connection.
* @param handler a handler that is notified when events occur for the
* connection.
* @return the newly created connection object.
*/
Connection connectionToHost(String host, int port, Handler handler);
/**
* Set the host address used by the connection
* <p>
* This method will set/change the host address used by the Reactor to
* create an outbound network connection for the given Connection
* @param c the Connection to assign the address to
* @param host the address of the host to connect to (e.g. "localhost")
* @param port the port to use for the connection.
*/
void setConnectionHost(Connection c, String host, int port);
/**
* Get the address used by the connection
* <p>
* This may be used to retrieve the remote peer address.
* Note that the returned address may be in numeric IP format.
* @param c the Connection
* @return a string containing the address in the following format:
* <pre>
* host[:port]
* </pre>
*/
String getConnectionAddress(Connection c);
/**
* Creates a new acceptor. This is equivalent to calling:
* <pre>
* acceptor(host, port, null);
* </pre>
* @param host
* @param port
* @return the newly created acceptor object.
* @throws IOException
*/
Acceptor acceptor(String host, int port) throws IOException;
/**
* Creates a new acceptor. This acceptor listens for in-bound connections.
* @param host the host name or address of the NIC to listen on.
* @param port the port number to listen on.
* @param handler if non-<code>null</code> this handler is registered with
* each new connection accepted by the acceptor.
* @return the newly created acceptor object.
* @throws IOException
*/
Acceptor acceptor(String host, int port, Handler handler)
throws IOException;
/**
* Frees any resources (such as sockets and selectors) held by the reactor
* or its children.
*/
void free();
}