blob: ee7890afde550eff841c9b51b07da603c81b9ddd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.camelplugin;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.spring.Utils;
import org.apache.activemq.usage.Usage;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.RoutesDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import java.io.File;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* A StatisticsBroker You can retrieve a Map Message for a Destination - or
* Broker containing statistics as key-value pairs The message must contain a
* replyTo Destination - else its ignored
*
*/
public class CamelRoutesBroker extends BrokerFilter {
private static Logger LOG = LoggerFactory.getLogger(CamelRoutesBroker.class);
private String routesFile = "";
private int checkPeriod = 1000;
private Resource theRoutes;
private DefaultCamelContext camelContext;
private long lastRoutesModified = -1;
private CountDownLatch countDownLatch;
/**
* Overide methods to pause the broker whilst camel routes are loaded
*/
@Override
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
blockWhileLoadingCamelRoutes();
super.send(producerExchange, message);
}
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
blockWhileLoadingCamelRoutes();
super.acknowledge(consumerExchange, ack);
}
@Override
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
blockWhileLoadingCamelRoutes();
return super.messagePull(context, pull);
}
@Override
public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
blockWhileLoadingCamelRoutes();
super.processConsumerControl(consumerExchange, control);
}
@Override
public void reapplyInterceptor() {
blockWhileLoadingCamelRoutes();
super.reapplyInterceptor();
}
@Override
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
blockWhileLoadingCamelRoutes();
super.beginTransaction(context, xid);
}
@Override
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
blockWhileLoadingCamelRoutes();
return super.prepareTransaction(context, xid);
}
@Override
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
blockWhileLoadingCamelRoutes();
super.rollbackTransaction(context, xid);
}
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
blockWhileLoadingCamelRoutes();
super.commitTransaction(context, xid, onePhase);
}
@Override
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
blockWhileLoadingCamelRoutes();
super.forgetTransaction(context, transactionId);
}
@Override
public void preProcessDispatch(MessageDispatch messageDispatch) {
blockWhileLoadingCamelRoutes();
super.preProcessDispatch(messageDispatch);
}
@Override
public void postProcessDispatch(MessageDispatch messageDispatch) {
blockWhileLoadingCamelRoutes();
super.postProcessDispatch(messageDispatch);
}
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
blockWhileLoadingCamelRoutes();
return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
}
@Override
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
blockWhileLoadingCamelRoutes();
super.messageConsumed(context, messageReference);
}
@Override
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
blockWhileLoadingCamelRoutes();
super.messageDelivered(context, messageReference);
}
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
blockWhileLoadingCamelRoutes();
super.messageDiscarded(context, sub, messageReference);
}
@Override
public void isFull(ConnectionContext context, Destination destination, Usage<?> usage) {
blockWhileLoadingCamelRoutes();
super.isFull(context, destination, usage);
}
@Override
public void nowMasterBroker() {
blockWhileLoadingCamelRoutes();
super.nowMasterBroker();
}
/*
* Properties
*/
public String getRoutesFile() {
return routesFile;
}
public void setRoutesFile(String routesFile) {
this.routesFile = routesFile;
}
public int getCheckPeriod() {
return checkPeriod;
}
public void setCheckPeriod(int checkPeriod) {
this.checkPeriod = checkPeriod;
}
public CamelRoutesBroker(Broker next) {
super(next);
}
@Override
public void start() throws Exception {
super.start();
LOG.info("Starting CamelRoutesBroker");
camelContext = new DefaultCamelContext();
camelContext.setName("EmbeddedCamel-" + getBrokerName());
camelContext.start();
getBrokerService().getScheduler().executePeriodically(new Runnable() {
@Override
public void run() {
try {
loadCamelRoutes();
} catch (Throwable e) {
LOG.error("Failed to load Camel Routes", e);
}
}
}, getCheckPeriod());
}
@Override
public void stop() throws Exception {
CountDownLatch latch = this.countDownLatch;
if (latch != null){
latch.countDown();
}
if (camelContext != null){
camelContext.stop();
}
super.stop();
}
private void loadCamelRoutes() throws Exception{
if (theRoutes == null) {
String fileToUse = getRoutesFile();
if (fileToUse == null || fileToUse.trim().isEmpty()) {
BrokerContext brokerContext = getBrokerService().getBrokerContext();
if (brokerContext != null) {
String uri = brokerContext.getConfigurationUrl();
Resource resource = Utils.resourceFromString(uri);
if (resource.exists()) {
fileToUse = resource.getFile().getParent();
fileToUse += File.separator;
fileToUse += "routes.xml";
}
}
}
if (fileToUse != null && !fileToUse.isEmpty()){
theRoutes = Utils.resourceFromString(fileToUse);
setRoutesFile(theRoutes.getFile().getAbsolutePath());
}
}
if (!isStopped() && camelContext != null && theRoutes != null && theRoutes.exists()){
long lastModified = theRoutes.lastModified();
if (lastModified != lastRoutesModified){
CountDownLatch latch = new CountDownLatch(1);
this.countDownLatch = latch;
lastRoutesModified = lastModified;
List<RouteDefinition> currentRoutes = camelContext.getRouteDefinitions();
for (RouteDefinition rd:currentRoutes){
camelContext.stopRoute(rd);
camelContext.removeRouteDefinition(rd);
}
InputStream is = theRoutes.getInputStream();
RoutesDefinition routesDefinition = camelContext.loadRoutesDefinition(is);
for (RouteDefinition rd: routesDefinition.getRoutes()){
camelContext.startRoute(rd);
}
is.close();
latch.countDown();
this.countDownLatch=null;
}
}
}
private void blockWhileLoadingCamelRoutes(){
CountDownLatch latch = this.countDownLatch;
if (latch != null){
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}