blob: 6df655e13b0ed201a49973a0d63a63d416a63419 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ThreadPoolExecutor;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationFactoryImpl;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ManagedRegionBroker extends RegionBroker {
private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
private final ManagementContext managementContext;
private final ObjectName brokerObjectName;
private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
this.managementContext = context;
this.brokerObjectName = brokerObjectName;
}
@Override
public void start() throws Exception {
super.start();
// build all existing durable subscriptions
buildExistingSubscriptions();
}
@Override
protected void doStop(ServiceStopper stopper) {
super.doStop(stopper);
// lets remove any mbeans not yet removed
for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
ObjectName name = iter.next();
try {
managementContext.unregisterMBean(name);
} catch (InstanceNotFoundException e) {
LOG.warn("The MBean: " + name + " is no longer registered with JMX");
} catch (Exception e) {
stopper.onException(this, e);
}
}
registeredMBeans.clear();
}
@Override
protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
@Override
protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
@Override
protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
@Override
protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
public void register(ActiveMQDestination destName, Destination destination) {
// TODO refactor to allow views for custom destinations
try {
ObjectName objectName = createObjectName(destName);
DestinationView view;
if (destination instanceof Queue) {
view = new QueueView(this, (Queue)destination);
} else if (destination instanceof Topic) {
view = new TopicView(this, (Topic)destination);
} else {
view = null;
LOG.warn("JMX View is not supported for custom destination: " + destination);
}
if (view != null) {
registerDestination(objectName, destName, view);
}
} catch (Exception e) {
LOG.error("Failed to register destination " + destName, e);
}
}
public void unregister(ActiveMQDestination destName) {
try {
ObjectName objectName = createObjectName(destName);
unregisterDestination(objectName);
} catch (Exception e) {
LOG.error("Failed to unregister " + destName, e);
}
}
public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
String connectionClientId = context.getClientId();
ObjectName brokerJmxObjectName = brokerObjectName;
String objectNameStr = getSubscriptionObjectName(sub, connectionClientId, brokerJmxObjectName);
SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
try {
ObjectName objectName = new ObjectName(objectNameStr);
SubscriptionView view;
if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
// add offline subscribers to inactive list
SubscriptionInfo info = new SubscriptionInfo();
info.setClientId(context.getClientId());
info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
info.setDestination(sub.getConsumerInfo().getDestination());
addInactiveSubscription(key, info);
} else {
if (sub.getConsumerInfo().isDurable()) {
view = new DurableSubscriptionView(this, context.getClientId(), sub);
} else {
if (sub instanceof TopicSubscription) {
view = new TopicSubscriptionView(context.getClientId(), (TopicSubscription) sub);
} else {
view = new SubscriptionView(context.getClientId(), sub);
}
}
registerSubscription(objectName, sub.getConsumerInfo(), key, view);
}
subscriptionMap.put(sub, objectName);
return objectName;
} catch (Exception e) {
LOG.error("Failed to register subscription " + sub, e);
return null;
}
}
public static String getSubscriptionObjectName(Subscription sub, String connectionClientId, ObjectName brokerJmxObjectName) {
Hashtable map = brokerJmxObjectName.getKeyPropertyList();
String brokerDomain = brokerJmxObjectName.getDomain();
String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
String destinationType = "destinationType=" + sub.getConsumerInfo().getDestination().getDestinationTypeAsString();
String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getDestination().getPhysicalName());
String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
String persistentMode = "persistentMode=";
String consumerId = "";
if (sub.getConsumerInfo().isDurable()) {
persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getSubscriptionName());
} else {
persistentMode += "Non-Durable";
if (sub.getConsumerInfo() != null && sub.getConsumerInfo().getConsumerId() != null) {
consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().getConsumerId().toString());
}
}
objectNameStr += persistentMode + ",";
objectNameStr += destinationType + ",";
objectNameStr += destinationName + ",";
objectNameStr += clientId;
objectNameStr += consumerId;
return objectNameStr;
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription sub = super.addConsumer(context, info);
SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
if (inactiveName != null) {
// if it was inactive, register it
registerSubscription(context, sub);
}
return sub;
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
for (Subscription sub : subscriptionMap.keySet()) {
if (sub.getConsumerInfo().equals(info)) {
// unregister all consumer subs
unregisterSubscription(subscriptionMap.get(sub), true);
}
}
super.removeConsumer(context, info);
}
public void unregisterSubscription(Subscription sub) {
ObjectName name = subscriptionMap.remove(sub);
if (name != null) {
try {
SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
if (inactiveName != null) {
inactiveDurableTopicSubscribers.remove(inactiveName);
managementContext.unregisterMBean(inactiveName);
}
} catch (Exception e) {
LOG.error("Failed to unregister subscription " + sub, e);
}
}
}
protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
if (dest.isQueue()) {
if (dest.isTemporary()) {
temporaryQueues.put(key, view);
} else {
queues.put(key, view);
}
} else {
if (dest.isTemporary()) {
temporaryTopics.put(key, view);
} else {
topics.put(key, view);
}
}
try {
AnnotatedMBean.registerMBean(managementContext, view, key);
registeredMBeans.add(key);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
protected void unregisterDestination(ObjectName key) throws Exception {
DestinationView view = null;
removeAndRemember(topics, key, view);
removeAndRemember(queues, key, view);
removeAndRemember(temporaryQueues, key, view);
removeAndRemember(temporaryTopics, key, view);
if (registeredMBeans.remove(key)) {
try {
managementContext.unregisterMBean(key);
} catch (Throwable e) {
LOG.warn("Failed to unregister MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
if (view != null) {
key = view.getSlowConsumerStrategy();
if (key!= null && registeredMBeans.remove(key)) {
try {
managementContext.unregisterMBean(key);
} catch (Throwable e) {
LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
}
}
private void removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
DestinationView candidate = map.remove(key);
if (candidate != null && view == null) {
view = candidate;
}
}
protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
ActiveMQDestination dest = info.getDestination();
if (dest.isQueue()) {
if (dest.isTemporary()) {
temporaryQueueSubscribers.put(key, view);
} else {
queueSubscribers.put(key, view);
}
} else {
if (dest.isTemporary()) {
temporaryTopicSubscribers.put(key, view);
} else {
if (info.isDurable()) {
durableTopicSubscribers.put(key, view);
// unregister any inactive durable subs
try {
ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
if (inactiveName != null) {
inactiveDurableTopicSubscribers.remove(inactiveName);
registeredMBeans.remove(inactiveName);
managementContext.unregisterMBean(inactiveName);
}
} catch (Throwable e) {
LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
}
} else {
topicSubscribers.put(key, view);
}
}
}
try {
AnnotatedMBean.registerMBean(managementContext, view, key);
registeredMBeans.add(key);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
queueSubscribers.remove(key);
topicSubscribers.remove(key);
temporaryQueueSubscribers.remove(key);
temporaryTopicSubscribers.remove(key);
if (registeredMBeans.remove(key)) {
try {
managementContext.unregisterMBean(key);
} catch (Throwable e) {
LOG.warn("Failed to unregister MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
}
DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
if (view != null) {
// need to put this back in the inactive list
SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
if (addToInactive) {
SubscriptionInfo info = new SubscriptionInfo();
info.setClientId(subscriptionKey.getClientId());
info.setSubscriptionName(subscriptionKey.getSubscriptionName());
info.setDestination(new ActiveMQTopic(view.getDestinationName()));
addInactiveSubscription(subscriptionKey, info);
}
}
}
protected void buildExistingSubscriptions() throws Exception {
Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
Set destinations = destinationFactory.getDestinations();
if (destinations != null) {
for (Iterator iter = destinations.iterator(); iter.hasNext();) {
ActiveMQDestination dest = (ActiveMQDestination)iter.next();
if (dest.isTopic()) {
SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
if (infos != null) {
for (int i = 0; i < infos.length; i++) {
SubscriptionInfo info = infos[i];
SubscriptionKey key = new SubscriptionKey(info);
if (!alreadyKnown(key)) {
LOG.debug("Restoring durable subscription mbean: " + info);
subscriptions.put(key, info);
}
}
}
}
}
}
for (Iterator i = subscriptions.entrySet().iterator(); i.hasNext();) {
Map.Entry entry = (Entry)i.next();
SubscriptionKey key = (SubscriptionKey)entry.getKey();
SubscriptionInfo info = (SubscriptionInfo)entry.getValue();
addInactiveSubscription(key, info);
}
}
private boolean alreadyKnown(SubscriptionKey key) {
boolean known = false;
known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
if (LOG.isTraceEnabled()) {
LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") + " already registered");
}
return known;
}
protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) {
Hashtable map = brokerObjectName.getKeyPropertyList();
try {
ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type=Subscription," + "active=false,"
+ "name=" + JMXSupport.encodeObjectNamePart(key.toString()) + "");
SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info);
try {
AnnotatedMBean.registerMBean(managementContext, view, objectName);
registeredMBeans.add(objectName);
} catch (Throwable e) {
LOG.warn("Failed to register MBean: " + key);
LOG.debug("Failure reason: " + e, e);
}
inactiveDurableTopicSubscribers.put(objectName, view);
subscriptionKeys.put(key, objectName);
} catch (Exception e) {
LOG.error("Failed to register subscription " + info, e);
}
}
public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
List<Message> messages = getSubscriberMessages(view);
CompositeData c[] = new CompositeData[messages.size()];
for (int i = 0; i < c.length; i++) {
try {
c[i] = OpenTypeSupport.convert(messages.get(i));
} catch (Throwable e) {
LOG.error("failed to browse : " + view, e);
}
}
return c;
}
public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
List<Message> messages = getSubscriberMessages(view);
CompositeType ct = factory.getCompositeType();
TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
TabularDataSupport rc = new TabularDataSupport(tt);
for (int i = 0; i < messages.size(); i++) {
rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
}
return rc;
}
protected List<Message> getSubscriberMessages(SubscriptionView view) {
// TODO It is very dangerous operation for big backlogs
if (!(destinationFactory instanceof DestinationFactoryImpl)) {
throw new RuntimeException("unsupported by " + destinationFactory);
}
PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
final List<Message> result = new ArrayList<Message>();
try {
ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
TopicMessageStore store = adapter.createTopicMessageStore(topic);
store.recover(new MessageRecoveryListener() {
public boolean recoverMessage(Message message) throws Exception {
result.add(message);
return true;
}
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
throw new RuntimeException("Should not be called.");
}
public boolean hasSpace() {
return true;
}
public boolean isDuplicate(MessageId id) {
return false;
}
});
} catch (Throwable e) {
LOG.error("Failed to browse messages for Subscription " + view, e);
}
return result;
}
protected ObjectName[] getTopics() {
Set<ObjectName> set = topics.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getQueues() {
Set<ObjectName> set = queues.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryTopics() {
Set<ObjectName> set = temporaryTopics.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryQueues() {
Set<ObjectName> set = temporaryQueues.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTopicSubscribers() {
Set<ObjectName> set = topicSubscribers.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getDurableTopicSubscribers() {
Set<ObjectName> set = durableTopicSubscribers.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getQueueSubscribers() {
Set<ObjectName> set = queueSubscribers.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryTopicSubscribers() {
Set<ObjectName> set = temporaryTopicSubscribers.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryQueueSubscribers() {
Set<ObjectName> set = temporaryQueueSubscribers.keySet();
return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getInactiveDurableTopicSubscribers() {
Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
return set.toArray(new ObjectName[set.size()]);
}
public Broker getContextBroker() {
return contextBroker;
}
public void setContextBroker(Broker contextBroker) {
this.contextBroker = contextBroker;
}
protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException {
// Build the object name for the destination
Hashtable map = brokerObjectName.getKeyPropertyList();
ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type="
+ JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination="
+ JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
return objectName;
}
public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
ObjectName objectName = null;
try {
objectName = createObjectName(strategy);
if (!registeredMBeans.contains(objectName)) {
AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
AnnotatedMBean.registerMBean(managementContext, view, objectName);
registeredMBeans.add(objectName);
}
} catch (Exception e) {
LOG.warn("Failed to register MBean: " + strategy);
LOG.debug("Failure reason: " + e, e);
}
return objectName;
}
private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
Hashtable map = brokerObjectName.getKeyPropertyList();
ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
+ "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName()));
return objectName;
}
public ObjectName getSubscriberObjectName(Subscription key) {
return subscriptionMap.get(key);
}
public Subscription getSubscriber(ObjectName key) {
Subscription sub = null;
for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
if (entry.getValue().equals(key)) {
sub = entry.getKey();
break;
}
}
return sub;
}
}