blob: 84fa230615b25da61d003751312586573349bad6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
/**
* Used to add listeners for Broker actions
*
*
*/
public class BrokerBroadcaster extends BrokerFilter {
protected volatile Broker[] listeners = new Broker[0];
public BrokerBroadcaster(Broker next) {
super(next);
}
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
next.acknowledge(consumerExchange, ack);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].acknowledge(consumerExchange, ack);
}
}
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
next.addConnection(context, info);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].addConnection(context, info);
}
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription answer = next.addConsumer(context, info);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].addConsumer(context, info);
}
return answer;
}
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
next.addProducer(context, info);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].addProducer(context, info);
}
}
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
next.commitTransaction(context, xid, onePhase);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].commitTransaction(context, xid, onePhase);
}
}
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
next.removeSubscription(context, info);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].removeSubscription(context, info);
}
}
@Override
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
int result = next.prepareTransaction(context, xid);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
// TODO decide what to do with return values
brokers[i].prepareTransaction(context, xid);
}
return result;
}
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
next.removeConnection(context, info, error);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].removeConnection(context, info, error);
}
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
next.removeConsumer(context, info);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].removeConsumer(context, info);
}
}
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
next.removeProducer(context, info);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].removeProducer(context, info);
}
}
@Override
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
next.rollbackTransaction(context, xid);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].rollbackTransaction(context, xid);
}
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
next.send(producerExchange, messageSend);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].send(producerExchange, messageSend);
}
}
@Override
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
next.beginTransaction(context, xid);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].beginTransaction(context, xid);
}
}
@Override
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
next.forgetTransaction(context, transactionId);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].forgetTransaction(context, transactionId);
}
}
@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
Destination result = next.addDestination(context, destination,createIfTemporary);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].addDestination(context, destination,createIfTemporary);
}
return result;
}
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
next.removeDestination(context, destination, timeout);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].removeDestination(context, destination, timeout);
}
}
@Override
public void start() throws Exception {
next.start();
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].start();
}
}
@Override
public void stop() throws Exception {
next.stop();
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].stop();
}
}
@Override
public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
next.addSession(context, info);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].addSession(context, info);
}
}
@Override
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
next.removeSession(context, info);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].removeSession(context, info);
}
}
@Override
public void gc() {
next.gc();
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].gc();
}
}
@Override
public void addBroker(Connection connection, BrokerInfo info) {
next.addBroker(connection, info);
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
brokers[i].addBroker(connection, info);
}
}
protected Broker[] getListeners() {
return listeners;
}
public synchronized void addListener(Broker broker) {
List<Broker> tmp = getListenersAsList();
tmp.add(broker);
listeners = tmp.toArray(new Broker[tmp.size()]);
}
public synchronized void removeListener(Broker broker) {
List<Broker> tmp = getListenersAsList();
tmp.remove(broker);
listeners = tmp.toArray(new Broker[tmp.size()]);
}
protected List<Broker> getListenersAsList() {
List<Broker> tmp = new ArrayList<Broker>();
Broker brokers[] = getListeners();
for (int i = 0; i < brokers.length; i++) {
tmp.add(brokers[i]);
}
return tmp;
}
}