blob: 3a1a66d484aab4a205b272f5dd09dccc3242b53b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerInfo;
/**
* This broker filter handles composite destinations. If a broker operation is
* invoked using a composite destination, this filter repeats the operation
* using each destination of the composite. HRC: I think this filter is
* dangerous to use to with the consumer operations. Multiple Subscription
* objects will be associated with a single JMS consumer each having a different
* idea of what the current pre-fetch dispatch size is. If this is used, then
* the client has to expect many more messages to be dispatched than the
* pre-fetch setting allows.
*
*
*/
public class CompositeDestinationBroker extends BrokerFilter {
public CompositeDestinationBroker(Broker next) {
super(next);
}
/**
* A producer may register to send to multiple destinations via a composite
* destination.
*/
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
// The destination may be null.
ActiveMQDestination destination = info.getDestination();
if (destination != null && destination.isComposite()) {
ActiveMQDestination[] destinations = destination.getCompositeDestinations();
for (int i = 0; i < destinations.length; i++) {
ProducerInfo copy = info.copy();
copy.setDestination(destinations[i]);
next.addProducer(context, copy);
}
} else {
next.addProducer(context, info);
}
}
/**
* A producer may de-register from sending to multiple destinations via a
* composite destination.
*/
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
// The destination may be null.
ActiveMQDestination destination = info.getDestination();
if (destination != null && destination.isComposite()) {
ActiveMQDestination[] destinations = destination.getCompositeDestinations();
for (int i = 0; i < destinations.length; i++) {
ProducerInfo copy = info.copy();
copy.setDestination(destinations[i]);
next.removeProducer(context, copy);
}
} else {
next.removeProducer(context, info);
}
}
/**
*
*/
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
ActiveMQDestination destination = message.getDestination();
if (destination.isComposite()) {
ActiveMQDestination[] destinations = destination.getCompositeDestinations();
for (int i = 0; i < destinations.length; i++) {
if (i != 0) {
message = message.copy();
message.setMemoryUsage(null);
}
message.setOriginalDestination(destination);
message.setDestination(destinations[i]);
next.send(producerExchange, message);
}
} else {
next.send(producerExchange, message);
}
}
}