blob: 20c726f63d4f2202969069a3d0bcfb40cad066bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.pm.config;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.dataformat.xstream.XStreamDataFormat;
import org.apache.camel.impl.DefaultClassResolver;
import org.apache.uima.ducc.common.config.CommonConfiguration;
import org.apache.uima.ducc.common.config.DuccBlastGuardPredicate;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.XStreamUtils;
import org.apache.uima.ducc.pm.ProcessManager;
import org.apache.uima.ducc.pm.ProcessManagerComponent;
import org.apache.uima.ducc.pm.event.ProcessManagerEventListener;
import org.apache.uima.ducc.transport.DuccTransportConfiguration;
import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
import org.apache.uima.ducc.transport.event.PmStateDuccEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.security.AnyTypePermission;
/**
* A {@link ProcessManagerConfiguration} to configure Process Manager component. Depends on
* properties loaded by a main program into System properties.
*
*/
@Configuration
@Import({DuccTransportConfiguration.class,CommonConfiguration.class})
public class ProcessManagerConfiguration {
private static DuccLogger logger = new DuccLogger(ProcessManagerConfiguration.class, "ProcessManagerConfiguration");
// use Spring magic to autowire (instantiate and bind) CommonConfiguration to a local variable
@Autowired CommonConfiguration common;
// use Spring magic to autowire (instantiate and bind) DuccTransportConfiguration to a local variable
@Autowired DuccTransportConfiguration processManagerTransport;
/**
* Instantiate {@link ProcessManagerEventListener} which will handle incoming messages.
*
* @param pm - {@link ProcessManagerComponent} instance
* @return - {@link ProcessManagerEventListener}
*/
public ProcessManagerEventListener processManagerDelegateListener(ProcessManagerComponent pm) {
ProcessManagerEventListener pmel = new ProcessManagerEventListener(pm);
pmel.setEndpoint(common.agentRequestEndpoint);
return pmel;
}
/**
* Create a Router to handle incoming messages from a given endpoint. All messages are delegated
* to a provided listener. Note: Camel uses introspection to determine which method to call when
* delegating a message. The name of the method doesnt matter it is the argument that needs
* to match the type of object in the message. If there is no method with a matching argument
* type the message will not be delegated.
*
* @param endpoint - endpoint where messages are expected
* @param delegate - {@link ProcessManagerEventListener} instance
* @return - initialized {@link RouteBuilder} instance
*
*/
public synchronized RouteBuilder routeBuilderForIncomingRequests(final String endpoint, final ProcessManagerEventListener delegate, final ProcessManagerComponent pm) {
return new RouteBuilder() {
public void configure() {
System.out.println("Process Manager waiting for messages on endpoint:"+endpoint);
onException(Throwable.class).
maximumRedeliveries(0). // dont redeliver the message
handled(false). // the caller will receive the exception
process(new ErrorProcessor()); // delegate exception to the handler
from(endpoint)
.bean(delegate);
}
};
}
public class ErrorProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
// the caused by exception is stored in a property on the exchange
Throwable throwable = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
logger.error("ErrorProcessor.process",null, throwable);
throwable.printStackTrace();
}
}
/**
* This class handles a message before it is delegated to a component listener. In this method the message can be enriched,
* logged, etc.
*
*/
public class TransportProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
String methodName="process";
logger.info(methodName, null,"Transport received Event. Body Type:"+exchange.getIn().getBody().getClass().getName());
// Destination replyTo = exchange.getIn().getHeader("JMSReplyTo", Destination.class);
// System.out.println("... transport - value of replyTo:" + replyTo);
}
}
public class DebugProcessor implements Processor {
private ProcessManagerComponent pm;
public DebugProcessor(ProcessManagerComponent pm) {
this.pm = pm;
}
public void process(Exchange exchange) throws Exception {
String methodName="process";
if ( pm.getLogLevel().toLowerCase().equals("trace")) {
String marshalledEvent =
XStreamUtils.marshall(exchange.getIn().getBody());
/*
XStreamDataFormat xStreamDataFormat = new XStreamDataFormat();
xStreamDataFormat.setPermissions("*");
XStream xStream = xStreamDataFormat.getXStream(new DefaultClassResolver());
xStream.addPermission(AnyTypePermission.ANY);
String marshalledEvent = xStream.toXML(exchange.getIn().getBody());
*/
pm.logAtTraceLevel(methodName, marshalledEvent);
}
// if ( logger.isDebug() ) {
// XStreamDataFormat xStreamDataFormat = new XStreamDataFormat();
// XStream xStream = xStreamDataFormat.getXStream(new DefaultClassResolver());
// String marshalledEvent = xStream.toXML(exchange.getIn().getBody());
// logger.debug(methodName, null,marshalledEvent);
// }
// Destination replyTo = exchange.getIn().getHeader("JMSReplyTo", Destination.class);
// System.out.println("... transport - value of replyTo:" + replyTo);
}
}
/**
* Creates Camel router that will publish ProcessManager state at regular intervals.
*
* @param targetEndpointToReceiveProcessManagerStateUpdate - endpoint where to publish PM state
* @param statePublishRate - how often to publish state
* @return
* @throws Exception
*/
private RouteBuilder routeBuilderForProcessManagerStatePost(final ProcessManagerComponent pm, final String targetEndpointToReceiveProcessManagerStateUpdate, final int statePublishRate) throws Exception {
final ProcessManagerStateProcessor pmsp = // an object responsible for generating the state
new ProcessManagerStateProcessor(pm);
return new RouteBuilder() {
public void configure() {
String methodName = "configure";
final Predicate blastGuard = new DuccBlastGuardPredicate(pm.getLogger());
logger.trace(methodName, null,"timer:pmStateDumpTimer?fixedRate=true&period=" + statePublishRate);
logger.trace(methodName, null,"endpoint=" + targetEndpointToReceiveProcessManagerStateUpdate);
from("timer:pmStateDumpTimer?fixedRate=true&period=" + statePublishRate)
// This route uses a filter to prevent sudden bursts of messages which
// may flood DUCC daemons causing chaos. The filter disposes any message
// that appears in a window of 1 sec or less.
.filter(blastGuard)
.process(pmsp)
.to(targetEndpointToReceiveProcessManagerStateUpdate);
}
};
}
/**
* Camel Processor responsible for generating ProcessManager's state.
*/
private class ProcessManagerStateProcessor implements Processor {
private ProcessManager pm;
private ProcessManagerStateProcessor(ProcessManager pm) {
this.pm = pm;
}
public void process(Exchange exchange) throws Exception {
// Fetch new state from ProcessManager
PmStateDuccEvent jse = pm.getState();
// Add the state object to the Message
exchange.getIn().setBody(jse);
}
}
/**
* Creates and initializes {@link ProcessManagerComponent} instance. @Bean annotation identifies {@link ProcessManagerComponent}
* as a Spring framework Bean which will be managed by Spring container.
*
* @return {@link ProcessManagerComponent} instance
*
* @throws Exception
*/
@Bean
public ProcessManagerComponent processManager() throws Exception {
CamelContext camelContext = common.camelContext();
DuccEventDispatcher eventDispatcher = processManagerTransport.duccEventDispatcher(common.agentRequestEndpoint, camelContext);
logger.info("processManager()",null, "PM publishes state update to Agents on endpoint:"+common.agentRequestEndpoint);
ProcessManagerComponent pm = new ProcessManagerComponent(camelContext, eventDispatcher);
pm.setStateChangeEndpoint(common.daemonsStateChangeEndpoint);
// Instantiate delegate listener to receive incoming messages.
ProcessManagerEventListener delegateListener = this.processManagerDelegateListener(pm);
// Inject a dispatcher into the listener in case it needs to send
// a message to another component
delegateListener.setDuccEventDispatcher(eventDispatcher);
// Inject Camel Router that will delegate messages to Process Manager delegate listener
pm.getContext().addRoutes(this.routeBuilderForIncomingRequests(common.orchestratorStateUpdateEndpoint, delegateListener, pm));
pm.getContext().addRoutes(this.routeBuilderForProcessManagerStatePost(pm, common.pmStateUpdateEndpoint, Integer.parseInt(common.pmStatePublishRate)));
return pm;
}
}