| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.configuration; |
| |
| import org.apache.commons.configuration.Configuration; |
| import org.apache.commons.configuration.ConfigurationException; |
| |
| import org.apache.qpid.server.binding.Binding; |
| import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; |
| import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; |
| import org.apache.qpid.server.exchange.TopicExchange; |
| import org.apache.qpid.server.queue.AMQQueue; |
| |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| |
| public class TopicConfiguration extends ConfigurationPlugin implements ExchangeConfigurationPlugin |
| { |
| public static final ConfigurationPluginFactory FACTORY = new TopicConfigurationFactory(); |
| |
| private static final String VIRTUALHOSTS_VIRTUALHOST_TOPICS = "virtualhosts.virtualhost.topics"; |
| |
| public static class TopicConfigurationFactory implements ConfigurationPluginFactory |
| { |
| |
| public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException |
| { |
| TopicConfiguration topicsConfig = new TopicConfiguration(); |
| topicsConfig.setConfiguration(path, config); |
| return topicsConfig; |
| } |
| |
| public List<String> getParentPaths() |
| { |
| return Arrays.asList(VIRTUALHOSTS_VIRTUALHOST_TOPICS); |
| } |
| } |
| |
| private Map<String, TopicConfig> _topics = new HashMap<String, TopicConfig>(); |
| private Map<String, Map<String, TopicConfig>> _subscriptions = new HashMap<String, Map<String, TopicConfig>>(); |
| |
| public String[] getElementsProcessed() |
| { |
| return new String[]{"topic"}; |
| } |
| |
| @Override |
| public void validateConfiguration() throws ConfigurationException |
| { |
| if (getConfig().isEmpty()) |
| { |
| throw new ConfigurationException("Topics section cannot be empty."); |
| } |
| |
| int topics = getConfig().getList("topic.name").size() + |
| getConfig().getList("topic.subscriptionName").size(); |
| |
| for (int index = 0; index < topics; index++) |
| { |
| Configuration topicSubset = getConfig().subset("topic(" + index + ")"); |
| |
| // This will occur when we have a subscriptionName that is bound to a |
| // topic. |
| if (topicSubset.isEmpty()) |
| { |
| break; |
| } |
| |
| TopicConfig topic = new TopicConfig(); |
| |
| topic.setConfiguration(VIRTUALHOSTS_VIRTUALHOST_TOPICS + ".topic", topicSubset ); |
| |
| String name = getConfig().getString("topic(" + index + ").name"); |
| String subscriptionName = getConfig().getString("topic(" + index + ").subscriptionName"); |
| |
| // Record config if subscriptionName is there |
| if (subscriptionName != null) |
| { |
| processSubscription(subscriptionName, topic); |
| } |
| else |
| { |
| // Otherwise record config as topic if we have the name |
| if (name != null) |
| { |
| processTopic(name, topic); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param name |
| * @param topic |
| * |
| * @throws org.apache.commons.configuration.ConfigurationException |
| * |
| */ |
| private void processTopic(String name, TopicConfig topic) throws ConfigurationException |
| { |
| if (_topics.containsKey(name)) |
| { |
| throw new ConfigurationException("Topics section cannot contain two entries for the same topic."); |
| } |
| else |
| { |
| _topics.put(name, topic); |
| } |
| } |
| |
| |
| private void processSubscription(String name, TopicConfig topic) throws ConfigurationException |
| { |
| Map<String,TopicConfig> topics; |
| if (_subscriptions.containsKey(name)) |
| { |
| topics = _subscriptions.get(name); |
| |
| if (topics.containsKey(topic.getName())) |
| { |
| throw new ConfigurationException("Subcription cannot contain two entries for the same topic."); |
| } |
| } |
| else |
| { |
| topics = new HashMap<String,TopicConfig>(); |
| } |
| |
| topics.put(topic.getName(),topic); |
| _subscriptions.put(name, topics); |
| |
| } |
| |
| @Override |
| public String formatToString() |
| { |
| return "Topics:" + _topics + ", Subscriptions:" + _subscriptions; |
| } |
| |
| /** |
| * This processes the given queue and apply configuration in the following |
| * order: |
| * |
| * Global Topic Values -> Topic Values -> Subscription Values |
| * |
| * @param queue |
| * |
| * @return |
| */ |
| public ConfigurationPlugin getConfiguration(AMQQueue queue) |
| { |
| //Create config with global topic configuration |
| TopicConfig config = new TopicConfig(); |
| |
| // Add global topic configuration |
| config.addConfiguration(this); |
| |
| // Process Topic Bindings as these are more generic than subscriptions |
| List<TopicConfig> boundToTopics = new LinkedList<TopicConfig>(); |
| |
| //Merge the configuration in the order that they are bound |
| for (Binding binding : queue.getBindings()) |
| { |
| if (binding.getExchange().getType().equals(TopicExchange.TYPE)) |
| { |
| // Identify topic for the binding key |
| TopicConfig topicConfig = getTopicConfigForRoutingKey(binding.getBindingKey()); |
| if (topicConfig != null) |
| { |
| boundToTopics.add(topicConfig); |
| } |
| } |
| } |
| |
| // If the Queue is bound to a number of topics then only use the global |
| // topic configuration. |
| // todo - What does it mean in terms of configuration to be bound to a |
| // number of topics? Do we try and merge? |
| // YES - right thing to do would be to merge from generic to specific. |
| // Means we need to be able to get an ordered list of topics for this |
| // binding. |
| if (boundToTopics.size() == 1) |
| { |
| config.addConfiguration(boundToTopics.get(0)); |
| } |
| |
| // If we have a subscription then attempt to look it up. |
| String subscriptionName = queue.getName(); |
| |
| // Apply subscription configurations |
| if (_subscriptions.containsKey(subscriptionName)) |
| { |
| |
| //Get all the Configuration that this subscription is bound to. |
| Map<String, TopicConfig> topics = _subscriptions.get(subscriptionName); |
| |
| TopicConfig subscriptionSpecificConfig = null; |
| |
| // See if we have a TopicConfig in topics for a topic we are bound to. |
| for (Binding binding : queue.getBindings()) |
| { |
| if (binding.getExchange().getType().equals(TopicExchange.TYPE)) |
| { |
| //todo - What does it mean to have multiple matches? |
| // Take the first match we get |
| if (subscriptionSpecificConfig == null) |
| { |
| // lookup the binding to see if we have a match in the subscription configs |
| subscriptionSpecificConfig = topics.get(binding.getBindingKey()); |
| } |
| } |
| } |
| |
| //todo we don't account for wild cards here. only explicit matching and all subscriptions |
| if (subscriptionSpecificConfig == null) |
| { |
| // lookup the binding to see if we have a match in the subscription configs |
| subscriptionSpecificConfig = topics.get("#"); |
| } |
| |
| // Apply subscription specific config. |
| if (subscriptionSpecificConfig != null) |
| { |
| config.addConfiguration(subscriptionSpecificConfig); |
| } |
| } |
| return config; |
| } |
| |
| /** |
| * This method should perform the same heuristics as the TopicExchange |
| * to attempt to identify a piece of configuration for the give routingKey. |
| * |
| * i.e. If we have 'stocks.*' defined in the config |
| * and we bind 'stocks.appl' then we should return the 'stocks.*' |
| * configuration. |
| * |
| * @param routingkey the key to lookup |
| * |
| * @return the TopicConfig if found. |
| */ |
| private TopicConfig getTopicConfigForRoutingKey(String routingkey) |
| { |
| //todo actually perform TopicExchange style lookup not just straight |
| // lookup as we are just now. |
| return _topics.get(routingkey); |
| } |
| |
| } |