blob: 8e4e6099d0868aa42298d0f3fe0c8b475a11cc97 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.management.amqp;
import java.nio.charset.StandardCharsets;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.security.auth.Subject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageSender;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.SystemAddressSpaceCreator;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.DtxNotSupportedException;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNode;
public class ManagementAddressSpace implements NamedAddressSpace
{
public static final String MANAGEMENT_ADDRESS_SPACE_NAME = "$management";
private static final String MANAGEMENT_NODE_NAME = "$management";
private static final Logger LOGGER = LoggerFactory.getLogger(ManagementAddressSpace.class);
private final String _name;
private final SystemAddressSpaceCreator.AddressSpaceRegistry _addressSpaceRegistry;
private final ManagementNode _managementNode;
private final VirtualHostPropertiesNode _propertiesNode;
private final MessageStore _messageStore;
private final MessageDestination _defaultDestination = new DefaultDestination();
private final List<AMQPConnection<?>> _connections = new CopyOnWriteArrayList<>();
private final LinkRegistry _linkRegistry = new NonDurableLinkRegistry();
private final Broker<?> _broker;
private final Principal _principal;
private final UUID _id;
private final ConcurrentMap<Object, ConcurrentMap<String, ProxyMessageSource>> _connectionSpecificDestinations = new ConcurrentHashMap<>();
public ManagementAddressSpace(final SystemAddressSpaceCreator.AddressSpaceRegistry addressSpaceRegistry)
{
this(MANAGEMENT_ADDRESS_SPACE_NAME, addressSpaceRegistry);
}
public ManagementAddressSpace(String name, final SystemAddressSpaceCreator.AddressSpaceRegistry addressSpaceRegistry)
{
_name = name;
_addressSpaceRegistry = addressSpaceRegistry;
_broker = addressSpaceRegistry.getBroker();
_managementNode = new ManagementNode(this, addressSpaceRegistry.getBroker());
_propertiesNode = new VirtualHostPropertiesNode(this);
_messageStore = new MemoryMessageStore();
_principal = new ManagementAddressSpacePrincipal(this);
_id = UUID.nameUUIDFromBytes((_broker.getId().toString()+"/"+name).getBytes(StandardCharsets.UTF_8));
}
@Override
public UUID getId()
{
return _id;
}
@Override
public MessageSource getAttainedMessageSource(final String name)
{
if(_managementNode.getName().equals(name))
{
return _managementNode;
}
else if(_propertiesNode.getName().equals(name))
{
return _propertiesNode;
}
else
{
return getProxyNode(name);
}
}
@Override
public MessageDestination getAttainedMessageDestination(final String name)
{
if(_managementNode.getName().equals(name))
{
return _managementNode;
}
else
{
MessageDestination connectionSpecificDestinations = getProxyNode(name);
if (connectionSpecificDestinations != null) return connectionSpecificDestinations;
}
return null;
}
ProxyMessageSource getProxyNode(final String name)
{
LOGGER.debug("RG: looking for proxy source {}", name);
Subject currentSubject = Subject.getSubject(AccessController.getContext());
Set<SessionPrincipal> sessionPrincipals = currentSubject.getPrincipals(SessionPrincipal.class);
if (!sessionPrincipals.isEmpty())
{
Object connectionReference = sessionPrincipals.iterator().next().getSession().getConnectionReference();
Map<String, ProxyMessageSource>
connectionSpecificDestinations = _connectionSpecificDestinations.get(connectionReference);
if(connectionSpecificDestinations != null)
{
LOGGER.debug("RG: ", connectionSpecificDestinations);
return connectionSpecificDestinations.get(name);
}
}
return null;
}
ManagementNode getManagementNode()
{
return _managementNode;
}
@Override
public void registerConnection(final AMQPConnection<?> connection)
{
_connections.add(connection);
}
@Override
public void deregisterConnection(final AMQPConnection<?> connection)
{
_connections.remove(connection);
}
@Override
public String getRedirectHost(final AmqpPort<?> port)
{
return null;
}
@Override
public Principal getPrincipal()
{
return _principal;
}
@Override
public boolean isActive()
{
return true;
}
@Override
public MessageDestination getDefaultDestination()
{
return _defaultDestination;
}
@Override
public LinkRegistry getLinkRegistry(final String remoteContainerId)
{
return _linkRegistry;
}
@Override
public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
{
_broker.authorise(Operation.ACTION("manage"));
return true;
}
@Override
public DtxRegistry getDtxRegistry()
{
throw new DtxNotSupportedException("Distributed Transactions are not supported within this address space");
}
@Override
public MessageStore getMessageStore()
{
return _messageStore;
}
@Override
public <T extends MessageSource> T createMessageSource(final Class<T> clazz, final Map<String, Object> attributes)
{
if(clazz == MessageSource.class)
{
return (T) createProxyNode(attributes);
}
else
{
return null;
}
}
private ProxyMessageSource createProxyNode(final Map<String, Object> attributes)
{
Subject currentSubject = Subject.getSubject(AccessController.getContext());
Set<SessionPrincipal> sessionPrincipals = currentSubject.getPrincipals(SessionPrincipal.class);
if (!sessionPrincipals.isEmpty())
{
final ProxyMessageSource proxyMessageSource = new ProxyMessageSource(this, attributes);
final AMQSessionModel session = sessionPrincipals.iterator().next().getSession();
final Object connectionReference = session.getConnectionReference();
ConcurrentMap<String, ProxyMessageSource> connectionSpecificDestinations =
_connectionSpecificDestinations.get(connectionReference);
if (connectionSpecificDestinations == null)
{
connectionSpecificDestinations = new ConcurrentHashMap<>();
if(_connectionSpecificDestinations.putIfAbsent(connectionReference, connectionSpecificDestinations) == null)
{
session.getAMQPConnection().addDeleteTask(new Action()
{
@Override
public void performAction(final Object object)
{
_connectionSpecificDestinations.remove(connectionReference);
}
});
}
}
connectionSpecificDestinations.put(proxyMessageSource.getName(), proxyMessageSource);
return proxyMessageSource;
}
else
{
return null;
}
}
void removeProxyMessageSource(final Object connectionReference, final String name)
{
ConcurrentMap<String, ProxyMessageSource> connectionSpecificDestinations =
_connectionSpecificDestinations.get(connectionReference);
if(connectionSpecificDestinations != null)
{
connectionSpecificDestinations.remove(name);
}
}
@Override
public <T extends MessageDestination> T createMessageDestination(final Class<T> clazz,
final Map<String, Object> attributes)
{
if(clazz == MessageDestination.class)
{
return (T) createProxyNode(attributes);
}
else
{
return null;
}
}
@Override
public boolean hasMessageSources()
{
return true;
}
@Override
public Collection<? extends Connection<?>> getConnections()
{
return Collections.unmodifiableList(_connections);
}
@Override
public String getName()
{
return _name;
}
@Override
public List<String> getGlobalAddressDomains()
{
return Collections.emptyList();
}
private class DefaultDestination implements MessageDestination
{
@Override
public NamedAddressSpace getAddressSpace()
{
return ManagementAddressSpace.this;
}
@Override
public void authorisePublish(final SecurityToken token, final Map<String, Object> arguments)
throws AccessControlException
{
}
@Override
public String getName()
{
return ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
}
@Override
public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
final String routingAddress,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
final Action<? super MessageInstance> postEnqueueAction)
{
MessageDestination destination = getAttainedMessageDestination(routingAddress);
if(destination == null || destination == this)
{
return 0;
}
else
{
return destination.send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
}
}
@Override
public boolean isDurable()
{
return true;
}
@Override
public void linkAdded(final MessageSender sender, final PublishingLink link)
{
}
@Override
public void linkRemoved(final MessageSender sender, final PublishingLink link)
{
}
}
private class NonDurableLinkRegistry implements LinkRegistry
{
@Override
public LinkModel getDurableSendingLink(final String name)
{
return null;
}
@Override
public boolean registerSendingLink(final String name, final LinkModel link)
{
throw new ConnectionScopedRuntimeException("Durable links are not supported");
}
@Override
public boolean unregisterSendingLink(final String name)
{
return false;
}
@Override
public LinkModel getDurableReceivingLink(final String name)
{
return null;
}
@Override
public boolean registerReceivingLink(final String name, final LinkModel link)
{
throw new ConnectionScopedRuntimeException("Durable links are not supported");
}
}
}