blob: 6fa1f53648743e6373e9a40c9278f1b916e49462 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunnerFactory;
/**
* The Topic is a destination that sends a copy of a message to every active
* Subscription registered.
*
*
*/
public class TempTopic extends Topic implements Task{
private final ActiveMQTempDestination tempDest;
/**
* @param brokerService
* @param destination
* @param store
* @param parentStats
* @param taskFactory
* @throws Exception
*/
public TempTopic(BrokerService brokerService,
ActiveMQDestination destination, TopicMessageStore store,
DestinationStatistics parentStats, TaskRunnerFactory taskFactory)
throws Exception {
super(brokerService, destination, store, parentStats, taskFactory);
this.tempDest = (ActiveMQTempDestination) destination;
}
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
// the temporary destination
// However, we could have failed over - and we do this
// check client side anyways ....
if (!context.isFaultTolerant()
&& (!context.isNetworkConnection() && !tempDest
.getConnectionId().equals(
sub.getConsumerInfo().getConsumerId()
.getConnectionId()))) {
tempDest.setConnectionId(sub.getConsumerInfo().getConsumerId().getConnectionId());
LOG.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId());
}
super.addSubscription(context, sub);
}
public void initialize() {
}
}