package org.apache.uima.ducc.agent.deploy.uima;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.builder.RouteBuilder;
import org.apache.uima.ducc.agent.NodeAgent;
import org.apache.uima.ducc.agent.deploy.ManagedService;
import org.apache.uima.ducc.agent.deploy.ServiceAdapter;
import org.apache.uima.ducc.agent.deploy.ServiceStateNotificationAdapter;
import org.apache.uima.ducc.common.IDuccUser;
import org.apache.uima.ducc.common.config.CommonConfiguration;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.transport.DuccExchange;
import org.apache.uima.ducc.transport.DuccTransportConfiguration;
import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@Import({ DuccTransportConfiguration.class, CommonConfiguration.class })
public class UimaAsServiceConfiguration {
DuccTransportConfiguration transport;
CommonConfiguration common;
RouteBuilder routeBuilder;
CamelContext camelContext;
@Value("#{ systemProperties['ducc.uima-as.swap.usage.script'] }")
String swapUsageCollectorScript;
* Creates Camel Router to handle incoming messages
* @param delegate
* - {@code AgentEventListener} to delegate messages to
* @return {@code RouteBuilder} instance
public synchronized RouteBuilder routeBuilderForIncomingRequests(final String thisNodeIP,
final ProcessEventListener delegate) {
return new RouteBuilder() {
// Custom filter to select messages that are targeted for this process
// Checks the PID in a message to determine if this process is
// the target.
Predicate filter = new DuccProcessFilter(thisNodeIP);
public void configure() throws Exception {
System.out.println("Service Wrapper Starting Request Channel on Endpoint:"
+ common.managedServiceEndpoint);
onException(Exception.class).handled(true).process(new ErrorProcessor()).end();
public class ErrorProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
// the caused by exception is stored in a property on the exchange
Throwable caused = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
// System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1");
// assertNotNull(caused);
// here you can do what you want, but Camel regard this exception as handled, and
// this processor as a failurehandler, so it wont do redeliveries. So this is the
// end of this route. But if we want to route it somewhere we can just get a
// producer template and send it.
// send it to our mock endpoint
// exchange.getContext().createProducerTemplate().send("mock:myerror", exchange);
private ProcessEventListener processDelegateListener(ManagedService service) {
return new ProcessEventListener(service);
private ServiceStateNotificationAdapter serviceAdapter(DuccEventDispatcher eventDispatcher,
String stateUpdateEndpoint) {
return new ServiceAdapter(eventDispatcher,
public ManagedService managedService() throws Exception {
try {
// Assume IP address provided from environment. In production this
// will be the actual node IP. In testing, the IP can be virtual
// when running multiple agents on the same node. The agent is
// responsible for providing the IP in this process environment.
String thisNodeIP = (System.getenv(IDuccUser.EnvironmentVariable.DUCC_IP.value()) == null)
? InetAddress.getLocalHost().getHostAddress()
: System.getenv(IDuccUser.EnvironmentVariable.DUCC_IP.value());
camelContext = common.camelContext();
int serviceSocketPort = 0;
String agentSocketParams = "";
String jpSocketParams = "";
if (common.managedServiceEndpointParams != null) {
jpSocketParams = "?" + common.managedServiceEndpointParams;
if (common.managedProcessStateUpdateEndpointParams != null) {
agentSocketParams = "?" + common.managedProcessStateUpdateEndpointParams;
// set up agent socket endpoint where this UIMA AS service will send state updates
if (common.managedProcessStateUpdateEndpointType != null
&& common.managedProcessStateUpdateEndpointType.equalsIgnoreCase("socket")) {
String updatePort = System.getenv(IDuccUser.EnvironmentVariable.DUCC_UPDATE_PORT.value());
common.managedProcessStateUpdateEndpoint = "mina:tcp://localhost:" + updatePort
+ agentSocketParams;
// set up a socket endpoint where the UIMA AS service will receive events sent from its agent
if (common.managedServiceEndpointType != null
&& common.managedServiceEndpointType.equalsIgnoreCase("socket")) {
serviceSocketPort = Utils.findFreePort();
// service is on the same node as the agent
common.managedServiceEndpoint = "mina:tcp://localhost:" + serviceSocketPort
+ jpSocketParams;
// optionally configures Camel Context for JMS. Checks the 'agentRequestEndpoint' to
// to determine type of transport. If the the endpoint starts with "activemq:", a
// special ActiveMQ component will be activated to enable JMS transport
DuccEventDispatcher eventDispatcher = transport
.duccEventDispatcher(common.managedProcessStateUpdateEndpoint, camelContext);
ManagedUimaService service = new ManagedUimaService(common.saxonJarPath,
serviceAdapter(eventDispatcher, common.managedServiceEndpoint), camelContext);
System.out.println("## Agent Service State Update Endpoint:"
+ common.managedProcessStateUpdateEndpoint + " ##");
ProcessEventListener delegateListener = processDelegateListener(service);
routeBuilder = this.routeBuilderForIncomingRequests(thisNodeIP, delegateListener);
return service;
} catch (Exception e) {
throw e;
public void stop() throws Exception {
if (camelContext != null) {
for (Route route : camelContext.getRoutes()) {
System.out.println(">>> configFactory.stop() - stopped route:" + route.getId());
// camelContext.stop();
private class DuccProcessFilter implements Predicate {
String thisNodeIP;
public DuccProcessFilter(final String thisNodeIP) {
this.thisNodeIP = thisNodeIP;
public synchronized boolean matches(Exchange exchange) {
// String methodName="DuccProcessFilter.matches";
boolean result = false;
try {
String pid = (String) exchange.getIn().getHeader(DuccExchange.ProcessPID);
String targetIP = (String) exchange.getIn().getHeader(DuccExchange.DUCCNODEIP);
// check if this message is targeting this process. Check if the process PID
// and the node match target process.
if (Utils.getPID().equals(pid) && thisNodeIP.equals(targetIP)) { // Get PID of this process
result = true;
System.out.println(">>>>>>>>> Process Received a Message. Is Process target for message:"
+ result + ". Target PID:" + pid);
} catch (Throwable e) {
return result;