blob: 472353486a9704acee47b8c92ce94bc3b3345464 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.controlbus;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Route;
import org.apache.camel.ServiceStatus;
import org.apache.camel.spi.CamelLogger;
import org.apache.camel.spi.Language;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
/**
* The control bus producer.
*/
public class ControlBusProducer extends DefaultAsyncProducer {
private final CamelLogger logger;
public ControlBusProducer(Endpoint endpoint, CamelLogger logger) {
super(endpoint);
this.logger = logger;
}
@Override
public ControlBusEndpoint getEndpoint() {
return (ControlBusEndpoint) super.getEndpoint();
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
if (getEndpoint().getLanguage() != null) {
try {
processByLanguage(exchange, getEndpoint().getLanguage());
} catch (Exception e) {
exchange.setException(e);
}
} else if (getEndpoint().getAction() != null) {
try {
processByAction(exchange);
} catch (Exception e) {
exchange.setException(e);
}
}
callback.done(true);
return true;
}
protected void processByLanguage(Exchange exchange, Language language) throws Exception {
LanguageTask task = new LanguageTask(exchange, language);
if (getEndpoint().isAsync()) {
getEndpoint().getComponent().getExecutorService().submit(task);
} else {
task.run();
}
}
protected void processByAction(Exchange exchange) throws Exception {
ActionTask task = new ActionTask(exchange);
if (getEndpoint().isAsync()) {
getEndpoint().getComponent().getExecutorService().submit(task);
} else {
task.run();
}
}
private static String getRouteId(Exchange exchange) {
String answer = null;
UnitOfWork uow = exchange.getUnitOfWork();
RouteContext rc = uow != null ? uow.getRouteContext() : null;
if (rc != null) {
answer = rc.getRoute().getId();
}
if (answer == null) {
// fallback and get from route id on the exchange
answer = exchange.getFromRouteId();
}
return answer;
}
/**
* Tasks to run when processing by language.
*/
private final class LanguageTask implements Runnable {
private final Exchange exchange;
private final Language language;
private LanguageTask(Exchange exchange, Language language) {
this.exchange = exchange;
this.language = language;
}
@Override
public void run() {
String task = null;
Object result = null;
try {
// create dummy exchange
Exchange dummy = ExchangeHelper.createCopy(exchange, true);
task = dummy.getIn().getMandatoryBody(String.class);
if (task != null) {
Expression exp = language.createExpression(task);
result = exp.evaluate(dummy, Object.class);
}
if (result != null && !getEndpoint().isAsync()) {
// can only set result on exchange if sync
exchange.getIn().setBody(result);
}
if (task != null) {
logger.log("ControlBus task done [" + task + "] with result -> " + (result != null ? result : "void"));
}
} catch (Exception e) {
logger.log("Error executing ControlBus task [" + task + "]. This exception will be ignored.", e);
}
}
}
/**
* Tasks to run when processing by route action.
*/
private final class ActionTask implements Runnable {
private final Exchange exchange;
private ActionTask(Exchange exchange) {
this.exchange = exchange;
}
@Override
public void run() {
String action = getEndpoint().getAction();
String id = getEndpoint().getRouteId();
if (ObjectHelper.equal("current", id)) {
id = getRouteId(exchange);
}
Object result = null;
String task = action + " route " + id;
try {
if ("start".equals(action)) {
log.debug("Starting route: {}", id);
getEndpoint().getCamelContext().getRouteController().startRoute(id);
} else if ("stop".equals(action)) {
log.debug("Stopping route: {}", id);
getEndpoint().getCamelContext().getRouteController().stopRoute(id);
} else if ("suspend".equals(action)) {
log.debug("Suspending route: {}", id);
getEndpoint().getCamelContext().getRouteController().suspendRoute(id);
} else if ("resume".equals(action)) {
log.debug("Resuming route: {}", id);
getEndpoint().getCamelContext().getRouteController().resumeRoute(id);
} else if ("restart".equals(action)) {
log.debug("Restarting route: {}", id);
getEndpoint().getCamelContext().getRouteController().stopRoute(id);
int delay = getEndpoint().getRestartDelay();
if (delay > 0) {
try {
log.debug("Sleeping {} ms before starting route: {}", delay, id);
Thread.sleep(delay);
} catch (InterruptedException e) {
// ignore
}
}
getEndpoint().getCamelContext().getRouteController().startRoute(id);
} else if ("status".equals(action)) {
log.debug("Route status: {}", id);
ServiceStatus status = getEndpoint().getCamelContext().getRouteController().getRouteStatus(id);
if (status != null) {
result = status.name();
}
} else if ("stats".equals(action)) {
log.debug("Route stats: {}", id);
// camel context or per route
String name = getEndpoint().getCamelContext().getManagementName();
if (name == null) {
result = "JMX is disabled, cannot get stats";
} else {
ObjectName on;
String operation;
if (id == null) {
CamelContext camelContext = getEndpoint().getCamelContext();
on = getEndpoint().getCamelContext().getManagementStrategy().getManagementObjectNameStrategy().getObjectNameForCamelContext(camelContext);
operation = "dumpRoutesStatsAsXml";
} else {
Route route = getEndpoint().getCamelContext().getRoute(id);
on = getEndpoint().getCamelContext().getManagementStrategy().getManagementObjectNameStrategy().getObjectNameForRoute(route);
operation = "dumpRouteStatsAsXml";
}
if (on != null) {
MBeanServer server = getEndpoint().getCamelContext().getManagementStrategy().getManagementAgent().getMBeanServer();
result = server.invoke(on, operation, new Object[]{true, true}, new String[]{"boolean", "boolean"});
} else {
result = "Cannot lookup route with id " + id;
}
}
}
if (result != null && !getEndpoint().isAsync()) {
// can only set result on exchange if sync
exchange.getIn().setBody(result);
}
logger.log("ControlBus task done [" + task + "] with result -> " + (result != null ? result : "void"));
} catch (Exception e) {
logger.log("Error executing ControlBus task [" + task + "]. This exception will be ignored.", e);
}
}
}
}