blob: f3c0755e2764161c7f9e2e688b8ef887f8fc9162 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.agent.deploy.uima;
import java.net.InetAddress;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.builder.RouteBuilder;
import org.apache.uima.ducc.agent.NodeAgent;
import org.apache.uima.ducc.agent.deploy.ManagedService;
import org.apache.uima.ducc.agent.deploy.ServiceAdapter;
import org.apache.uima.ducc.agent.deploy.ServiceStateNotificationAdapter;
import org.apache.uima.ducc.common.IDuccUser;
import org.apache.uima.ducc.common.config.CommonConfiguration;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.transport.DuccExchange;
import org.apache.uima.ducc.transport.DuccTransportConfiguration;
import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@Configuration
@Import({ DuccTransportConfiguration.class, CommonConfiguration.class })
public class UimaAsServiceConfiguration {
@Autowired
DuccTransportConfiguration transport;
@Autowired
CommonConfiguration common;
RouteBuilder routeBuilder;
CamelContext camelContext;
@Value("#{ systemProperties['ducc.uima-as.swap.usage.script'] }")
String swapUsageCollectorScript;
/**
* Creates Camel Router to handle incoming messages
*
* @param delegate
* - {@code AgentEventListener} to delegate messages to
*
* @return {@code RouteBuilder} instance
*/
public synchronized RouteBuilder routeBuilderForIncomingRequests(final String thisNodeIP,
final ProcessEventListener delegate) {
return new RouteBuilder() {
// Custom filter to select messages that are targeted for this process
// Checks the PID in a message to determine if this process is
// the target.
Predicate filter = new DuccProcessFilter(thisNodeIP);
public void configure() throws Exception {
System.out.println("Service Wrapper Starting Request Channel on Endpoint:"
+ common.managedServiceEndpoint);
onException(Exception.class).handled(true).process(new ErrorProcessor()).end();
from(common.managedServiceEndpoint)
.choice().when(filter).bean(delegate).end().setId(common.managedServiceEndpoint);
}
};
}
public class ErrorProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
// the caused by exception is stored in a property on the exchange
Throwable caused = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
caused.printStackTrace();
// System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!1");
// assertNotNull(caused);
// here you can do what you want, but Camel regard this exception as handled, and
// this processor as a failurehandler, so it wont do redeliveries. So this is the
// end of this route. But if we want to route it somewhere we can just get a
// producer template and send it.
// send it to our mock endpoint
// exchange.getContext().createProducerTemplate().send("mock:myerror", exchange);
}
}
private ProcessEventListener processDelegateListener(ManagedService service) {
return new ProcessEventListener(service);
}
private ServiceStateNotificationAdapter serviceAdapter(DuccEventDispatcher eventDispatcher,
String stateUpdateEndpoint) {
return new ServiceAdapter(eventDispatcher,
System.getProperty(IDuccUser.DashD.DUCC_ID_PROCESS_UNIQUE.value()),
stateUpdateEndpoint);
}
@Bean
public ManagedService managedService() throws Exception {
try {
// Assume IP address provided from environment. In production this
// will be the actual node IP. In testing, the IP can be virtual
// when running multiple agents on the same node. The agent is
// responsible for providing the IP in this process environment.
String thisNodeIP = (System.getenv(IDuccUser.EnvironmentVariable.DUCC_IP.value()) == null)
? InetAddress.getLocalHost().getHostAddress()
: System.getenv(IDuccUser.EnvironmentVariable.DUCC_IP.value());
camelContext = common.camelContext();
int serviceSocketPort = 0;
String agentSocketParams = "";
String jpSocketParams = "";
if (common.managedServiceEndpointParams != null) {
jpSocketParams = "?" + common.managedServiceEndpointParams;
}
if (common.managedProcessStateUpdateEndpointParams != null) {
agentSocketParams = "?" + common.managedProcessStateUpdateEndpointParams;
}
// set up agent socket endpoint where this UIMA AS service will send state updates
if (common.managedProcessStateUpdateEndpointType != null
&& common.managedProcessStateUpdateEndpointType.equalsIgnoreCase("socket")) {
String updatePort = System.getenv(IDuccUser.EnvironmentVariable.DUCC_UPDATE_PORT.value());
common.managedProcessStateUpdateEndpoint = "mina:tcp://localhost:" + updatePort
+ agentSocketParams;
}
// set up a socket endpoint where the UIMA AS service will receive events sent from its agent
if (common.managedServiceEndpointType != null
&& common.managedServiceEndpointType.equalsIgnoreCase("socket")) {
serviceSocketPort = Utils.findFreePort();
// service is on the same node as the agent
common.managedServiceEndpoint = "mina:tcp://localhost:" + serviceSocketPort
+ jpSocketParams;
}
// optionally configures Camel Context for JMS. Checks the 'agentRequestEndpoint' to
// to determine type of transport. If the the endpoint starts with "activemq:", a
// special ActiveMQ component will be activated to enable JMS transport
DuccEventDispatcher eventDispatcher = transport
.duccEventDispatcher(common.managedProcessStateUpdateEndpoint, camelContext);
ManagedUimaService service = new ManagedUimaService(common.saxonJarPath,
common.dd2SpringXslPath,
serviceAdapter(eventDispatcher, common.managedServiceEndpoint), camelContext);
service.setConfigFactory(this);
service.setAgentStateUpdateEndpoint(common.managedProcessStateUpdateEndpoint);
System.out.println("#######################################################");
System.out.println("## Agent Service State Update Endpoint:"
+ common.managedProcessStateUpdateEndpoint + " ##");
System.out.println("#######################################################");
ProcessEventListener delegateListener = processDelegateListener(service);
delegateListener.setDuccEventDispatcher(eventDispatcher);
routeBuilder = this.routeBuilderForIncomingRequests(thisNodeIP, delegateListener);
camelContext.addRoutes(routeBuilder);
return service;
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
public void stop() throws Exception {
if (camelContext != null) {
for (Route route : camelContext.getRoutes()) {
route.getConsumer().stop();
System.out.println(">>> configFactory.stop() - stopped route:" + route.getId());
}
}
// camelContext.stop();
}
private class DuccProcessFilter implements Predicate {
String thisNodeIP;
public DuccProcessFilter(final String thisNodeIP) {
this.thisNodeIP = thisNodeIP;
}
public synchronized boolean matches(Exchange exchange) {
// String methodName="DuccProcessFilter.matches";
boolean result = false;
try {
String pid = (String) exchange.getIn().getHeader(DuccExchange.ProcessPID);
String targetIP = (String) exchange.getIn().getHeader(DuccExchange.DUCCNODEIP);
// check if this message is targeting this process. Check if the process PID
// and the node match target process.
if (Utils.getPID().equals(pid) && thisNodeIP.equals(targetIP)) { // Get PID of this process
result = true;
System.out.println(">>>>>>>>> Process Received a Message. Is Process target for message:"
+ result + ". Target PID:" + pid);
}
} catch (Throwable e) {
e.printStackTrace();
}
return result;
}
}
}