blob: 0a75f53de57bf1a4dfff22cc9e483b510a8122dc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
[[ugr.tug.fc]]
= Flow Controller Developer's Guide
A Flow Controller is a component that plugs into an Aggregate Analysis Engine.
When a CAS is input to the Aggregate, the Flow Controller determines the order in which the components of that aggregate are invoked on that CAS.
The ability to provide your own Flow Controller implementation is new as of release 2.0 of UIMA.
Flow Controllers may decide the flow dynamically, based on the contents of the CAS.
So, as just one example, you could develop a Flow Controller that first sends each CAS to a Language Identification Annotator and then, based on the output of the Language Identification Annotator, routes that CAS to an Annotator that is specialized for that particular language.
[[ugr.tug.fc.developing_fc_code]]
== Developing the Flow Controller Code
[[ugr.tug.fc.fc_interface_overview]]
=== Flow Controller Interface Overview
Flow Controller implementations should extend from the `JCasFlowController_ImplBase` or `CasFlowController_ImplBase` classes, depending on which CAS interface they prefer to use.
As with other types of components, the Flow Controller ImplBase classes define optional ``initialize``, ``destroy``, and `reconfigure` methods.
They also define the required method ``computeFlow``.
The `computeFlow` method is called by the framework whenever a new CAS enters the Aggregate Analysis Engine.
It is given the CAS as an argument and must return an object which implements the `Flow` interface (the Flow object). The Flow Controller developer must define this object.
It is the object that is responsible for routing this particular CAS through the components of the Aggregate Analysis Engine.
For convenience, the framework provides basic implementation of flow objects in the classes CasFlow_ImplBase and JCasFlow_ImplBase; use the JCas one if you are using the JCas interface to the CAS.
The framework then uses the Flow object and calls its `next()` method, which returns a `Step` object (implemented by the UIMA Framework) that indicates what to do next with this CAS next.
There are three types of steps currently supported:
* ``SimpleStep``, which specifies a single Analysis Engine that should receive the CAS next.
* ``ParallelStep``, which specifies that multiple Analysis Engines should receive the CAS next, and that the relative order in which these Analysis Engines execute does not matter. Logically, they can run in parallel. The runtime is not obligated to actually execute them in parallel, however, and the current implementation will execute them serially in an arbitrary order.
* ``FinalStep``, which indicates that the flow is completed.
After executing the step, the framework will call the Flow object's `next()` method again to determine the next destination, and this will be repeated until the Flow Object indicates that processing is complete by returning a ``FinalStep``.
The Flow Controller has access to a ``FlowControllerContext``, which is a subtype of ``UimaContext``.
In addition to the configuration parameter and resource access provided by a ``UimaContext``, the `FlowControllerContext` also gives access to the metadata for all of the Analysis Engines that the Flow Controller can route CASes to.
Most Flow Controllers will need to use this information to make routing decisions.
You can get a handle to the `FlowControllerContext` by calling the `getContext()` method defined in `JCasFlowController_ImplBase` and ``CasFlowController_ImplBase``.
Then, the `FlowControllerContext.getAnalysisEngineMetaDataMap` method can be called to get a map containing an entry for each of the Analysis Engines in the Aggregate.
The keys in this map are the same as the delegate analysis engine keys specified in the aggregate descriptor, and the values are the corresponding `AnalysisEngineMetaData` objects.
Finally, the Flow Controller has optional methods `addAnalysisEngines` and ``removeAnalysisEngines``.
These methods are intended to notify the Flow Controller if new Analysis Engines are available to route CASes to, or if previously available Analysis Engines are no longer available.
However, the current version of the Apache UIMA framework does not support dynamically adding or removing Analysis Engines to/from an aggregate, so these methods are not currently called.
Future versions may support this feature.
[[ugr.tug.fc.example_code]]
=== Example Code
This section walks through the source code of an example Flow Controller that simluates a simple version of the "`Whiteboard`" flow model.
At each step of the flow, the Flow Controller looks it all of the available Analysis Engines that have not yet run on this CAS, and picks one whose input requirements are satisfied.
The Java class for the example is `org.apache.uima.examples.flow.WhiteboardFlowController` and the source code is included in the UIMA SDK under the `examples/src` directory.
[[ugr.tug.fc.whiteboard]]
==== The WhiteboardFlowController Class
[source]
----
public class WhiteboardFlowController
extends CasFlowController_ImplBase {
public Flow computeFlow(CAS aCAS)
throws AnalysisEngineProcessException {
WhiteboardFlow flow = new WhiteboardFlow();
// As of release 2.3.0, the following is not needed,
// because the framework does this automatically
// flow.setCas(aCAS);
return flow;
}
class WhiteboardFlow extends CasFlow_ImplBase {
// Discussed Later
}
}
----
The `WhiteboardFlowController` extends from `CasFlowController_ImplBase` and implements the `computeFlow` method.
The implementation of the `computeFlow` method is very simple; it just constructs a new `WhiteboardFlow` object that will be responsible for routing this CAS.
The framework will add a handle to that CAS which it will later use to make its routing decisions.
Note that we will have one instance of `WhiteboardFlow` per CAS, so if there are multiple CASes being simultaneously processed there will not be any confusion.
[[ugr.tug.fc.whiteboardflow]]
==== The WhiteboardFlow Class
[source]
----
class WhiteboardFlow extends CasFlow_ImplBase {
private Set mAlreadyCalled = new HashSet();
public Step next() throws AnalysisEngineProcessException {
// Get the CAS that this Flow object is responsible for routing.
// Each Flow instance is responsible for a single CAS.
CAS cas = getCas();
// iterate over available AEs
Iterator aeIter = getContext().getAnalysisEngineMetaDataMap().
entrySet().iterator();
while (aeIter.hasNext()) {
Map.Entry entry = (Map.Entry) aeIter.next();
// skip AEs that were already called on this CAS
String aeKey = (String) entry.getKey();
if (!mAlreadyCalled.contains(aeKey)) {
// check for satisfied input capabilities
//(i.e. the CAS contains at least one instance
// of each required input
AnalysisEngineMetaData md =
(AnalysisEngineMetaData) entry.getValue();
Capability[] caps = md.getCapabilities();
boolean satisfied = true;
for (int i = 0; i < caps.length; i++) {
satisfied = inputsSatisfied(caps[i].getInputs(), cas);
if (satisfied)
break;
}
if (satisfied) {
mAlreadyCalled.add(aeKey);
if (mLogger.isLoggable(Level.FINEST)) {
getContext().getLogger().log(Level.FINEST,
"Next AE is: " + aeKey);
}
return new SimpleStep(aeKey);
}
}
}
// no appropriate AEs to call - end of flow
getContext().getLogger().log(Level.FINEST, "Flow Complete.");
return new FinalStep();
}
private boolean inputsSatisfied(TypeOrFeature[] aInputs, CAS aCAS) {
//implementation detail; see the actual source code
}
}
----
Each instance of the `WhiteboardFlowController` is responsible for routing a single CAS.
A handle to the CAS instance is available by calling the `getCas()` method, which is a standard method defined on the ``CasFlow_ImplBase ``superclass.
Each time the `next` method is called, the Flow object iterates over the metadata of all of the available Analysis Engines (obtained via the call to `getContext().
getAnalysisEngineMetaDataMap)` and sees if the input types declared in an AnalysisEngineMetaData object are satisfied by the CAS (that is, the CAS contains at least one instance of each declared input type). The exact details of checking for instances of types in the CAS are not discussed here – see the WhiteboardFlowController.java file for the complete source.
When the Flow object decides which AnalysisEngine should be called next, it indicates this by creating a SimpleStep object with the key for that AnalysisEngine and returning it:
[source]
----
return new SimpleStep(aeKey);
----
The Flow object keeps a list of which Analysis Engines it has invoked in the `mAlreadyCalled` field, and never invokes the same Analysis Engine twice.
Note this is not a hard requirement.
It is acceptable to design a FlowController that invokes the same Analysis Engine more than once.
However, if you do this you must make sure that the flow will eventually terminate.
If there are no Analysis Engines left whose input requirements are satisfied, the Flow object signals the end of the flow by returning a FinalStep object:
[source]
----
return new FinalStep();
----
Also, note the use of the logger to write tracing messages indicating the decisions made by the Flow Controller.
This is a good practice that helps with debugging if the Flow Controller is behaving in an unexpected way.
[[ugr.tug.fc.creating_fc_descriptor]]
== Creating the Flow Controller Descriptor
To create a Flow Controller Descriptor in the CDE, use File →New →Other →UIMA →Flow Controller Descriptor File:
image::images/tutorials_and_users_guides/tug.fc/image002.jpg[Screenshot of Eclipse new object wizard showing Flow Controller]
This will bring up the Overview page for the Flow Controller Descriptor:
image::images/tutorials_and_users_guides/tug.fc/image004.jpg[Screenshot of Component Descriptor Editor Overview page for new Flow Controller]
Type in the Java class name that implements the Flow Controller, or use the "`Browse`" button to select it.
You must select a Java class that implements the `FlowController` interface.
Flow Controller Descriptors are very similar to Primitive Analysis Engine Descriptors –for example you can specify configuration parameters and external resources if you wish.
If you wish to edit a Flow Controller Descriptor by hand, see xref:ref.adoc#ugr.ref.xml.component_descriptor.flow_controller[Flow Controller Descriptor Reference] for the syntax.
[[ugr.tug.fc.adding_fc_to_aggregate]]
== Adding a Flow Controller to an Aggregate Analysis Engine
// <titleabbrev>Adding Flow Controller to an Aggregate</titleabbrev>
To use a Flow Controller you must add it to an Aggregate Analysis Engine.
You can only have one Flow Controller per Aggregate Analysis Engine.
In the Component Descriptor Editor, the Flow Controller is specified on the Aggregate page, as a choice in the flow control kind - pick "`User-defined Flow`".
When you do, the Browse and Search buttons underneath become active, and allow you to specify an existing Flow Controller Descriptor, which when you select it, will be imported into the aggregate descriptor.
image::images/tutorials_and_users_guides/tug.fc/image006.jpg[Screenshot of Component Descriptor Editor Aggregate page showing selecting user-defined flow]
The key name is created automatically from the name element in the Flow Controller Descriptor being imported.
If you need to change this name, you can do so by switching to the "`Source`" view using the bottom tabs, and editing the name in the XML source.
If you edit your Aggregate Analysis Engine Descriptor by hand, the syntax for adding a Flow Controller is:
[source]
----
<delegateAnalysisEngineSpecifiers>
...
</delegateAnalysisEngineSpecifiers>
<flowController key=[String]>
<import .../>
</flowController>
----
As usual, you can xref:ref.adoc#ugr.ref.xml.component_descriptor.imports[import] either by location or by name.
The key that you assign to the FlowController can be used elsewhere in the Aggregate Analysis Engine Descriptor -- in parameter overrides, resource bindings, and Sofa mappings.
[[ugr.tug.fc.adding_fc_to_cpe]]
== Adding a Flow Controller to a Collection Processing Engine
// <titleabbrev>Adding Flow Controller to CPE</titleabbrev>
Flow Controllers cannot be added directly to Collection Processing Engines.
To use a Flow Controller in a CPE you first need to wrap the part of your CPE that requires complex flow control into an Aggregate Analysis Engine, and then add the Aggregate Analysis Engine to your CPE.
The CPE's deployment and error handling options can then only be configured for the entire Aggregate Analysis Engine as a unit.
[[ugr.tug.fc.using_fc_with_cas_multipliers]]
== Using Flow Controllers with CAS Multipliers
If you want your Flow Controller to work inside an Aggregate Analysis Engine that contains a xref:tug.adoc#ugr.tug.cm[CAS Multiplier], there are additional things you must consider.
When your Flow Controller routes a CAS to a CAS Multiplier, the CAS Multiplier may produce new CASes that then will also need to be routed by the Flow Controller.
When a new output CAS is produced, the framework will call the `newCasProduced` method on the Flow object that was managing the flow of the parent CAS (the one that was input to the CAS Multiplier). The `newCasProduced` method must create a new Flow object that will be responsible for routing the new output CAS.
In the `CasFlow_ImplBase` and `JCasFlow_ImplBase` classes, the `newCasProduced` method is defined to throw an exception indicating that the Flow Controller does not handle CAS Multipliers.
If you want your Flow Controller to properly deal with CAS Multipliers you must override this method.
If your Flow class extends ``CasFlow_ImplBase``, the method signature to override is:
[source]
----
protected Flow newCasProduced(CAS newOutputCas, String producedBy)
----
If your Flow class extends ``JCasFlow_ImplBase``, the method signature to override is:
[source]
----
protected Flow newCasProduced(JCas newOutputCas, String producedBy)
----
Also, there is a variant of `FinalStep` which can only be specified for output CASes produced by CAS Multipliers within the Aggregate Analysis Engine containing the Flow Controller.
This version of `FinalStep` is produced by the calling the constructor with a `true` argument, and it causes the CAS to be immediately released back to the pool.
No further processing will be done on it and it will not be output from the aggregate.
This is the way that you can build an Aggregate Analysis Engine that outputs some new CASes but not others.
Note that if you never want any new CASes to be output from the Aggregate Analysis Engine, you don't need to use this; instead just declare `<outputsNewCASes>false</outputsNewCASes>` in your xref:tug.adoc#ugr.tug.cm.aggregate_cms[Aggregate Analysis Engine Descriptor].
For more information on how CAS Multipliers interact with Flow Controllers, <<ugr.tug.cm.cm_and_fc>>.
[[ugr.tug.fc.continuing_when_exceptions_occur]]
== Continuing the Flow When Exceptions Occur
If an exception occurs when processing a CAS, the framework may call the method
[source]
----
boolean continueOnFailure(String failedAeKey, Exception failure)
----
on the Flow object that was managing the flow of that CAS.
If this method returns ``true``, then the framework may continue to call the `next()` method to continue routing the CAS.
If this method returns `false` (the default), the framework will not make any more calls to the `next()` method.
In the case where the last Step was a ParallelStep, if at least one of the destinations resulted in a failure, then `continueOnFailure` will be called to report one of the failures.
If this method returns true, but one of the other destinations in the ParallelStep resulted in a failure, then the `continueOnFailure` method will be called again to report the next failure.
This continues until either this method returns false or there are no more failures.
Note that it is possible for processing of a CAS to be aborted without this method being called.
This method is only called when an attempt is being made to continue processing of the CAS following an exception, which may be an application configuration decision.
In any case, if processing is aborted by the framework for any reason, including because `continueOnFailure` returned false, the framework will call the `Flow.aborted()` method to allow the Flow object to clean up any resources.
For an example of how to continue after an exception, see the example code ``org.apache.uima.examples.flow.AdvancedFixedFlowController``, in the `examples/src` directory of the UIMA SDK.
This exampe also demonstrates the use of ``ParallelStep``.