blob: 4c9054039a766cfc50da8c4d7b17f76be729d30f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.ws.config;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.ObjectMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.uima.ducc.common.config.CommonConfiguration;
import org.apache.uima.ducc.common.config.DuccBlastGuardPredicate;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.DuccTransportConfiguration;
import org.apache.uima.ducc.ws.DuccBoot;
import org.apache.uima.ducc.ws.WebServerComponent;
import org.apache.uima.ducc.ws.event.WebServerEventListener;
import org.apache.uima.ducc.ws.self.message.WebServerStateProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
/**
* A {@link WebServerConfiguration} to configure JobDriver component. Depends on
* properties loaded by a main program into System properties.
*
*/
@Configuration
@Import({DuccTransportConfiguration.class,CommonConfiguration.class})
public class WebServerConfiguration {
// use Spring magic to autowire (instantiate and bind) CommonConfiguration to a local variable
@Autowired CommonConfiguration common;
// use Spring magic to autowire (instantiate and bind) DuccTransportConfiguration to a local variable
@Autowired DuccTransportConfiguration webServerTransport;
private DuccLogger duccLogger = DuccLoggerComponents.getWsLogger(WebServerConfiguration.class.getName());
private DuccId jobid = null;
private AtomicBoolean singleton = new AtomicBoolean(false);
/**
* Instantiate {@link WebServerEventListener} which will handle incoming messages.
*
* @param ws - {@link WebServerComponent} instance
* @return - {@link WebServerEventListener}
*/
public WebServerEventListener webServerDelegateListener(WebServerComponent ws) {
WebServerEventListener wsel = new WebServerEventListener(ws);
return wsel;
}
/**
* Create a Router to handle incoming messages from a given endpoint. All messages are delegated
* to a provided listener. Note: Camel uses introspection to determine which method to call when
* delegating a message. The name of the method doesnt matter it is the argument that needs
* to match the type of object in the message. If there is no method with a matching argument
* type the message will not be delegated.
*
* @param endpoint - endpoint where messages are expected
* @param delegate - {@link WebServerEventListener} instance
* @return - initialized {@link RouteBuilder} instance
*
*/
public synchronized RouteBuilder routeBuilderForIncomingRequests(final String endpoint, final WebServerEventListener delegate) {
return new RouteBuilder() {
Processor p = new AccessJmsBody();
public void configure() {
from(endpoint)
.process(p)
//from("activemq:topic:tmp-jm-state")
.bean(delegate);
}
};
}
/**
* Creates Camel router that will publish WebServer state at regular intervals.
*
* Note: failure to receive these self-publications indicates that broker is down!
*
* @param targetEndpointToReceiveWebServerStateUpdate - endpoint where to publish WS state
* @param statePublishRate - how often to publish state
* @return
* @throws Exception
*/
private RouteBuilder routeBuilderForWebServerStatePost(final String targetEndpointToReceiveWebServerStateUpdate, final int statePublishRate) throws Exception {
final WebServerStateProcessor wssp = // an object responsible for generating the state
new WebServerStateProcessor();
return new RouteBuilder() {
public void configure() {
final Predicate blastFilter = new DuccBlastGuardPredicate(duccLogger);
from("timer:webserverStateDumpTimer?fixedRate=true&period=" + statePublishRate)
// This route uses a filter to prevent sudden bursts of messages which
// may flood DUCC daemons causing chaos. The filter disposes any event
// that appears in a window of 1 sec or less.
.filter(blastFilter)
//.process(xmStart)
.process(wssp)
//.process(xmEnded)
.to(targetEndpointToReceiveWebServerStateUpdate)
;
}
};
}
/**
* Creates and initializes {@link WebServerComponent} instance. @Bean annotation identifies {@link WebServerComponent}
* as a Spring framework Bean which will be managed by Spring container.
*
* @return {@link WebServerComponent} instance
*
* @throws Exception
*/
@Bean
public WebServerComponent webServer() throws Exception {
String methodName = "webServer";
WebServerComponent ws = null;
try {
if(singleton.getAndSet(true)) {
try {
throw new RuntimeException("singleton already present!");
}
catch(RuntimeException e) {
duccLogger.error(methodName, jobid, e);
throw e;
}
}
ws = new WebServerComponent(common.camelContext(), common);
DuccBoot.boot(common);
// Instantiate delegate listener to receive incoming messages.
WebServerEventListener delegateListener = this.webServerDelegateListener(ws);
// Inject a dispatcher into the listener in case it needs to send
// a message to another component
delegateListener.setDuccEventDispatcher(webServerTransport.duccEventDispatcher(common.orchestratorStateUpdateEndpoint, ws.getContext()));
// Inject Camel Router that will delegate messages to WebServer delegate listener
ws.getContext().addRoutes(this.routeBuilderForIncomingRequests(common.orchestratorStateUpdateEndpoint, delegateListener));
ws.getContext().addRoutes(this.routeBuilderForIncomingRequests(common.nodeMetricsEndpoint, delegateListener));
ws.getContext().addRoutes(this.routeBuilderForIncomingRequests(common.rmStateUpdateEndpoint, delegateListener));
ws.getContext().addRoutes(this.routeBuilderForIncomingRequests(common.smStateUpdateEndpoint, delegateListener));
ws.getContext().addRoutes(this.routeBuilderForIncomingRequests(common.pmStateUpdateEndpoint, delegateListener));
ws.getContext().addRoutes(this.routeBuilderForIncomingRequests(common.wsStateUpdateEndpoint, delegateListener));
ws.getContext().addRoutes(this.routeBuilderForWebServerStatePost(common.wsStateUpdateEndpoint, Integer.parseInt(common.wsStatePublishRate)));
String dbEndpoint = common.dbComponentStateUpdateEndpoint;
if(dbEndpoint != null) {
ws.getContext().addRoutes(this.routeBuilderForIncomingRequests(common.dbComponentStateUpdateEndpoint, delegateListener));
}
else {
duccLogger.warn(methodName, jobid, "db endpoint not configured");
}
}
catch(Throwable t) {
duccLogger.error(methodName, jobid, t);
}
return ws;
}
public class AccessJmsBody implements Processor
{
int msglen = 0;
String objectName = "unknown";
public void process(Exchange exchange) throws Exception {
String location = "AccessJmsBody:process";
try {
Object o = exchange.getIn();
if ( o instanceof JmsMessage ) {
JmsMessage msg = (JmsMessage) o;
o = msg.getJmsMessage();
if ( o instanceof ActiveMQMessage ) {
ActiveMQMessage amqMessage = (ActiveMQMessage) o;
if ( amqMessage instanceof ObjectMessage ) {
Object body = ((ObjectMessage)amqMessage).getObject();
ByteSequence bs = amqMessage.getContent();
msglen = bs.getLength();
objectName = body.getClass().getName();
}
}
}
Long pubSize = new Long(msglen);
exchange.getIn().setHeader("pubSize", pubSize);
String text = "Message length is " + msglen + " for " + objectName;
duccLogger.debug(location, jobid, text);
}
catch (Throwable t) {
duccLogger.error(location, jobid, t);
}
}
}
}