blob: 9f293379ee9df39bdf18ff2c9cce790847ce182e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.rm.config;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.uima.ducc.common.config.CommonConfiguration;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
import org.apache.uima.ducc.rm.NodeStability;
import org.apache.uima.ducc.rm.ResourceManager;
import org.apache.uima.ducc.rm.ResourceManagerComponent;
import org.apache.uima.ducc.rm.event.ResourceManagerEventListener;
import org.apache.uima.ducc.rm.scheduler.SchedConstants;
import org.apache.uima.ducc.transport.DuccTransportConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@Configuration
@Import({DuccTransportConfiguration.class,CommonConfiguration.class})
public class ResourceManagerConfiguration
implements SchedConstants
{
@Autowired CommonConfiguration common;
@Autowired DuccTransportConfiguration resourceManagerTransport;
DuccLogger logger = DuccLogger.getLogger(this.getClass(), COMPONENT_NAME);
public ResourceManagerEventListener resourceManagerDelegateListener(ResourceManagerComponent rm) {
ResourceManagerEventListener jmel = new ResourceManagerEventListener(rm);
int nodeStability = SystemPropertyResolver.getIntProperty("ducc.rm.node.stability", DEFAULT_STABILITY_COUNT);
int agentMetricsRate = SystemPropertyResolver.getIntProperty("ducc.agent.node.metrics.publish.rate", DEFAULT_NODE_METRICS_RATE);
NodeStability ns = new NodeStability(rm, nodeStability, agentMetricsRate);
rm.setNodeStability(ns);
jmel.setEndpoint(common.rmStateUpdateEndpoint);
jmel.setNodeStability(ns);
ns.start();
return jmel;
}
public RouteBuilder routeBuilderForEndpoint(final String endpoint, final ResourceManagerEventListener delegate) {
return new RouteBuilder() {
public void configure() {
from(endpoint)
.bean(delegate);
}
};
}
// test and debug only - user routeBuilderForEndpoint normally
public RouteBuilder routeBuilderForJmEndpoint(final String endpoint, final ResourceManagerEventListener delegate) {
System.out.println("Starting JM endpoint " + endpoint + " ???????????????????????");
return new RouteBuilder() {
public void configure() {
from(endpoint)
.threads(10)
.bean(delegate);
}
};
}
@Bean
public ResourceManagerComponent resourceManager()
throws Throwable
{
String methodName = "resourceManager";
ResourceManagerComponent rm = null;
try {
rm = new ResourceManagerComponent(common.camelContext());
// rm.init();
rm.setTransportConfiguration(resourceManagerTransport.duccEventDispatcher(common.rmStateUpdateEndpoint, rm.getContext()),
common.rmStateUpdateEndpoint);
// Instantiate Resource Manager delegate listener. This listener will receive
// incoming messages.
ResourceManagerEventListener delegateListener = this.resourceManagerDelegateListener(rm);
// Inject a dispatcher into the listener in case it needs to send
// a message to another component.
delegateListener.setDuccEventDispatcher(resourceManagerTransport.duccEventDispatcher(common.orchestratorStateUpdateEndpoint,rm.getContext()));
// Inject Camel Router that will generate state updates at regular intervals
// jrc rm.getContext().addRoutes(this.routeBuilderForRMStateUpdate(rm, common.rmStateUpdateEndpoint, Integer.parseInt(common.rmStatePublishRate)));
// Inject Camel Router that will handle Orchestrator state update messages
rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.orchestratorStateUpdateEndpoint, delegateListener));
// Inject Camel Router that will handle Agent Node inventory update messages
// rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.nodeInventoryEndpoint,delegateListener));
// Inject Camel Router that will handle Node Metrics messages
rm.getContext().addRoutes(this.routeBuilderForEndpoint(common.nodeMetricsEndpoint, delegateListener));
return rm;
} catch ( Throwable t ) {
logger.fatal(methodName, null, t);
throw new IllegalStateException("Can't start RM: " + t.getMessage());
}
}
public class ResourceManagerStateUpdateProcessor implements Processor {
private ResourceManager resourceManager;
public ResourceManagerStateUpdateProcessor(ResourceManager resourceManager) {
this.resourceManager = resourceManager;
}
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody(resourceManager.getState()); //new RmStateDuccEvent());
}
}
public class NodeInventoryProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
// System.out.println("... transport received Event. Body Type:"+exchange.getIn().getBody().getClass().getName());
//Destination replyTo = exchange.getIn().getHeader("JMSReplyTo", Destination.class);
// System.out.println("... transport - value of replyTo:" + replyTo);
}
}
public class NodeMetricsProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
// System.out.println("... transport received Event. Body Type:"+exchange.getIn().getBody().getClass().getName());
//Destination replyTo = exchange.getIn().getHeader("JMSReplyTo", Destination.class);
// System.out.println("... transport - value of replyTo:" + replyTo);
}
}
}