blob: ea5af028487522265446e9a134f9d7be8696363e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.*;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Creates <a href="http://activemq.org/site/mirrored-queues.html">Mirrored
* Queue</a> using a prefix and postfix to define the topic name on which to mirror the queue to.
*
*
* @org.apache.xbean.XBean
*/
public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware {
private static final transient Logger LOG = LoggerFactory.getLogger(MirroredQueue.class);
private String prefix = "VirtualTopic.Mirror.";
private String postfix = "";
private boolean copyMessage = true;
private BrokerService brokerService;
public Destination intercept(final Destination destination) {
if (destination.getActiveMQDestination().isQueue()) {
if (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues()) {
try {
final Destination mirrorDestination = getMirrorDestination(destination);
if (mirrorDestination != null) {
return new DestinationFilter(destination) {
public void send(ProducerBrokerExchange context, Message message) throws Exception {
message.setDestination(mirrorDestination.getActiveMQDestination());
mirrorDestination.send(context, message);
if (isCopyMessage()) {
message = message.copy();
}
message.setDestination(destination.getActiveMQDestination());
super.send(context, message);
}
};
}
}
catch (Exception e) {
LOG.error("Failed to lookup the mirror destination for: " + destination + ". Reason: " + e, e);
}
}
}
return destination;
}
public void remove(Destination destination) {
if (brokerService == null) {
throw new IllegalArgumentException("No brokerService injected!");
}
ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination());
if (topic != null) {
try {
brokerService.removeDestination(topic);
} catch (Exception e) {
LOG.error("Failed to remove mirror destination for " + destination + ". Reason: " + e,e);
}
}
}
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) {}
// Properties
// -------------------------------------------------------------------------
public String getPostfix() {
return postfix;
}
/**
* Sets any postix used to identify the queue consumers
*/
public void setPostfix(String postfix) {
this.postfix = postfix;
}
public String getPrefix() {
return prefix;
}
/**
* Sets the prefix wildcard used to identify the queue consumers for a given
* topic
*/
public void setPrefix(String prefix) {
this.prefix = prefix;
}
public boolean isCopyMessage() {
return copyMessage;
}
/**
* Sets whether a copy of the message will be sent to each destination.
* Defaults to true so that the forward destination is set as the
* destination of the message
*/
public void setCopyMessage(boolean copyMessage) {
this.copyMessage = copyMessage;
}
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
// Implementation methods
//-------------------------------------------------------------------------
protected Destination getMirrorDestination(Destination destination) throws Exception {
if (brokerService == null) {
throw new IllegalArgumentException("No brokerService injected!");
}
ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination());
return brokerService.getDestination(topic);
}
protected ActiveMQDestination getMirrorTopic(ActiveMQDestination original) {
return new ActiveMQTopic(prefix + original.getPhysicalName() + postfix);
}
}