blob: 0b18f9e2e37f13cf64b9cf58e337d78e1f6fc768 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.sm.config;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jetty9.JettyHttpComponent9;
import org.apache.uima.ducc.common.config.CommonConfiguration;
import org.apache.uima.ducc.common.exception.DuccRuntimeException;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.XStreamUtils;
import org.apache.uima.ducc.sm.ServiceManagerComponent;
import org.apache.uima.ducc.sm.event.ServiceManagerEventListener;
import org.apache.uima.ducc.transport.DuccTransportConfiguration;
import org.apache.uima.ducc.transport.event.AServiceRequest;
import org.apache.uima.ducc.transport.event.ServiceReplyEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
/**
* A {@link ServiceManagerConfiguration} to configure Service Manager component. Depends on
* properties loaded by a main program into System properties.
*
*/
@Configuration
@Import({DuccTransportConfiguration.class,CommonConfiguration.class})
public class ServiceManagerConfiguration
{
// use Spring magic to autowire (instantiate and bind) CommonConfiguration to a local variable
@Autowired CommonConfiguration common;
// use Spring magic to autowire (instantiate and bind) DuccTransportConfiguration to a local variable
@Autowired DuccTransportConfiguration serviceManagerTransport;
private DuccLogger logger = DuccLogger.getLogger(this.getClass(), "SM");
/**
* Instantiate {@link ServiceManagerEventListener} which will handle incoming messages.
*
* @param sm - {@link ServiceManagerComponent} instance
* @return - {@link ServiceManagerEventListener}
*/
public ServiceManagerEventListener serviceManagerDelegateListener(ServiceManagerComponent sm) {
ServiceManagerEventListener smel = new ServiceManagerEventListener(sm);
//smel.setEndpoint(common.jmStateUpdateEndpoint);
smel.setEndpoint(common.pmRequestEndpoint);
return smel;
}
/**
* Create a Router to handle incoming messages from a given endpoint. All messages are delegated
* to a provided listener. Note: Camel uses introspection to determine which method to call when
* delegating a message. The name of the method doesnt matter it is the argument that needs
* to match the type of object in the message. If there is no method with a matching argument
* type the message will not be delegated.
*
* @param endpoint - endpoint where messages are expected
* @param delegate - {@link ServiceManagerEventListener} instance
* @return - initialized {@link RouteBuilder} instance
*
*/
public synchronized RouteBuilder routeBuilderForIncomingRequests(final String endpoint, final ServiceManagerEventListener delegate) {
return new RouteBuilder() {
public void configure() {
from(endpoint)
//from("activemq:topic:tmp-jm-state")
.process(new TransportProcessor())
.bean(delegate);
}
};
}
/**
* @param endpoint - endpoint where messages are expected
* @param delegate - {@link ServiceManagerEventListener} instance
* @return - initialized {@link RouteBuilder} instance
*/
/*
public synchronized RouteBuilder routeBuilderForApi(final String endpoint, final ServiceManagerEventListener delegate)
{
return new RouteBuilder() {
public void configure() {
from(endpoint)
//from("activemq:topic:tmp-jm-state")
.process(new TransportProcessor())
.bean(delegate)
.process(new SmReplyProcessor()) // inject reply object
;
}
};
}
*/
private RouteBuilder routeBuilderForJetty(final CamelContext context, final ServiceManagerEventListener delegate) throws Exception {
return new RouteBuilder() {
public void configure() {
JettyHttpComponent9 jettyComponent = new JettyHttpComponent9();
String port = System.getProperty("ducc.sm.http.port");
//ExchangeMonitor xmError = new ExchangeMonitor(LifeStatus.Error, ExchangeType.Receive);
context.addComponent("jetty", jettyComponent);
onException(Throwable.class).maximumRedeliveries(0).handled(false).process(new ErrorProcessor());
from("jetty://http://0.0.0.0:" + port + "/sm")
.unmarshal().xstream()
.bean(delegate)
.process(new SmReplyProcessor()) // inject reply object
.process(new JettyReplyProcessor()) // translate to http response
;
}
};
}
private class SmReplyProcessor implements Processor {
private SmReplyProcessor() {
}
public void process(Exchange exchange) throws Exception
{
String methodName = "process";
try {
logger.info(methodName, null, "Replying");
AServiceRequest incoming = (AServiceRequest) exchange.getIn().getBody();
ServiceReplyEvent reply = incoming.getReply();
exchange.getIn().setBody(reply);
} catch ( Throwable t ) {
logger.error(methodName, null, t);
}
}
}
private class JettyReplyProcessor
implements Processor
{
public void process(Exchange exchange) throws Exception
{
exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
exchange.getOut().setHeader("content-type", "text/xml");
Object o = exchange.getIn().getBody();
if ( o != null ) {
String body = XStreamUtils.marshall(o);
exchange.getOut().setBody(body);
exchange.getOut().setHeader("content-length", body.length());
} else {
logger.warn("RouteBuilder.configure", null, new DuccRuntimeException("No reply object was provided."));
exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 500);
}
}
}
public class ErrorProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
// the caused by exception is stored in a property on the exchange
Throwable caused = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
try {
logger.error("ErrorProcessor.process", null, caused);
exchange.getOut().setBody(XStreamUtils.marshall(caused));
} catch( Throwable t ) {
logger.error("ErrorProcessor.process", null,t);
}
}
}
/**
* This class handles a message before it is delegated to a component listener. In this method the message can be enriched,
* logged, etc.
*
*/
public class TransportProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
// System.out.println("... SM transport received Event. Body Type:"+exchange.getIn().getBody().getClass().getName());
// Destination replyTo = exchange.getIn().getHeader("JMSReplyTo", Destination.class);
// System.out.println("... transport - value of replyTo:" + replyTo);
}
}
/**
* Creates Camel router that will publish Service Manager state at regular intervals.
*
* @param targetEndpointToReceiveSMStateUpdate - endpoint where to publish SM state
* @param statePublishRate - how often to publish state
* @return
* @throws Exception
* @deprecated
*
private RouteBuilder routeBuilderForSMStatePost(final ServiceManager sm, final String targetEndpointToReceiveSMStateUpdate, final int statePublishRate) throws Exception {
final ServiceManagerStateProcessor smp = // an object responsible for generating the state
new ServiceManagerStateProcessor(sm);
return new RouteBuilder() {
public void configure() {
from("timer:smStateDumpTimer?fixedRate=true&period=" + statePublishRate)
.process(smp)
.marshal()
.xstream()
.to(targetEndpointToReceiveSMStateUpdate);
}
};
}
*/
/**
* Camel Processor responsible for generating Service Manager's state.
*
*
private class ServiceManagerStateProcessor implements Processor {
private ServiceManager sm;
private ServiceManagerStateProcessor(ServiceManager sm) {
this.sm = sm;
}
public void process(Exchange exchange) throws Exception {
// Fetch new state from Job Manager
SmStateDuccEvent sse = sm.getState();
// Add the state object to the Message
exchange.getIn().setBody(sse);
}
}
*/
/**
* Creates and initializes {@link ServiceManagerComponent} instance. @Bean annotation identifies {@link ServiceManagerComponent}
* as a Spring framework Bean which will be managed by Spring container.
*
* @return {@link ServiceManagerComponent} instance
*
* @throws Exception
*/
@Bean
public ServiceManagerComponent serviceManager() throws Exception {
ServiceManagerComponent sm = new ServiceManagerComponent(common.camelContext());
// Instantiate delegate listener to receive incoming messages.
ServiceManagerEventListener delegateListener = this.serviceManagerDelegateListener(sm);
// Inject a dispatcher into the listener in case it needs to send
// a message to another component
// delegateListener.setDuccEventDispatcher(serviceManagerTransport.duccEventDispatcher(common.orchestratorStateUpdateEndpoint, sm.getContext()));
// Set context so SM can send state when it wants to (not on timer)
sm.setTransportConfiguration(serviceManagerTransport.duccEventDispatcher(common.smStateUpdateEndpoint, sm.getContext()),
common.smStateUpdateEndpoint);
// OR state messages - incoming
sm.getContext().addRoutes(this.routeBuilderForIncomingRequests(common.orchestratorStateUpdateEndpoint, delegateListener));
// API requests - incoming
//sm.getContext().addRoutes(this.routeBuilderForApi(common.smRequestEndpoint, delegateListener));
sm.getContext().addRoutes(this.routeBuilderForJetty(sm.getContext(), delegateListener));
// TODO Not used - timer to send state. We now send whenever we get an OR heartbeat.
//sm.getContext().addRoutes(this.routeBuilderForSMStatePost(sm, common.smStateUpdateEndpoint, Integer.parseInt(common.smStatePublishRate)));
return sm;
}
}