blob: 8a44330bccb0ca09208715b023e296dabeea8912 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.management.amqp;
import java.security.AccessControlException;
import java.security.AccessController;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.auth.Subject;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.DestinationReferrer;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageSender;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.StorableMessageMetaData;
public class ProxyMessageSource implements MessageSource, MessageDestination
{
private static final Logger LOGGER = LoggerFactory.getLogger(ProxyMessageSource.class);
private final String _name;
private final UUID _id = UUID.randomUUID();
private final ManagementAddressSpace _managementAddressSpace;
private volatile MessageInstanceConsumer<?> _consumer;
private final AtomicBoolean _consumerSet = new AtomicBoolean(false);
private Object _connectionReference;
public ProxyMessageSource(final ManagementAddressSpace managementAddressSpace, final Map<String, Object> attributes)
{
_name = String.valueOf(attributes.get(ConfiguredObject.NAME));
_managementAddressSpace = managementAddressSpace;
}
@Override
public String getName()
{
return _name;
}
@Override
public NamedAddressSpace getAddressSpace()
{
return _managementAddressSpace;
}
@Override
public void authorisePublish(final SecurityToken token, final Map<String, Object> arguments)
throws AccessControlException
{
throw new AccessControlException("Sending messages to temporary addresses in a management address space is not supported");
}
@Override
public <M extends ServerMessage<? extends StorableMessageMetaData>> RoutingResult<M> route(final M message,
final String routingAddress,
final InstanceProperties instanceProperties)
{
return new RoutingResult<>(message);
}
@Override
public boolean isDurable()
{
return false;
}
@Override
public void linkAdded(final MessageSender sender, final PublishingLink link)
{
}
@Override
public void linkRemoved(final MessageSender sender, final PublishingLink link)
{
}
@Override
public MessageDestination getAlternateBindingDestination()
{
return null;
}
@Override
public void removeReference(final DestinationReferrer destinationReferrer)
{
}
@Override
public void addReference(final DestinationReferrer destinationReferrer)
{
}
@Override
public UUID getId()
{
return _id;
}
@Override
public MessageDurability getMessageDurability()
{
return MessageDurability.NEVER;
}
@Override
public <T extends ConsumerTarget<T>> MessageInstanceConsumer<T> addConsumer(final T target,
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
final EnumSet<ConsumerOption> options,
final Integer priority)
throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
ConsumerAccessRefused, QueueDeleted
{
if(_consumerSet.compareAndSet(false,true))
{
Subject currentSubject = Subject.getSubject(AccessController.getContext());
Set<SessionPrincipal> sessionPrincipals = currentSubject.getPrincipals(SessionPrincipal.class);
if (!sessionPrincipals.isEmpty())
{
_connectionReference = sessionPrincipals.iterator().next().getSession().getConnectionReference();
WrappingTarget<T> wrapper = new WrappingTarget<>(target, _name);
_managementAddressSpace.getManagementNode().addConsumer(wrapper, filters, messageClass, _name, options, priority);
final MessageInstanceConsumer<T> consumer = wrapper.getConsumer();
_consumer = consumer;
return consumer;
}
else
{
return null;
}
}
else
{
throw new ExistingExclusiveConsumer();
}
}
@Override
public Collection<? extends MessageInstanceConsumer> getConsumers()
{
return _consumer == null ? Collections.<MessageInstanceConsumer>emptySet() : Collections.singleton(_consumer);
}
@Override
public boolean verifySessionAccess(final AMQPSession<?,?> session)
{
return session.getConnectionReference() == _connectionReference;
}
@Override
public void close()
{
}
@Override
public MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy()
{
return MessageConversionExceptionHandlingPolicy.CLOSE;
}
private class WrappingTarget<T extends ConsumerTarget<T>> implements ConsumerTarget<WrappingTarget<T>>
{
private final T _underlying;
private final String _address;
private MessageInstanceConsumer<T> _consumer;
public WrappingTarget(final T target, String address)
{
_underlying = target;
_address = address;
}
public T getUnderlying()
{
return _underlying;
}
public MessageInstanceConsumer<T> getConsumer()
{
return _consumer;
}
@Override
public void acquisitionRemoved(final MessageInstance node)
{
_underlying.acquisitionRemoved(node);
}
@Override
public boolean processPending()
{
return _underlying.processPending();
}
@Override
public String getTargetAddress()
{
return _address;
}
@Override
public boolean isMultiQueue()
{
return false;
}
@Override
public void notifyWork()
{
_underlying.notifyWork();
}
@Override
public void updateNotifyWorkDesired()
{
_underlying.updateNotifyWorkDesired();
}
@Override
public boolean isNotifyWorkDesired()
{
return _underlying.isNotifyWorkDesired();
}
@Override
public State getState()
{
return _underlying.getState();
}
@Override
public void consumerAdded(final MessageInstanceConsumer<WrappingTarget<T>> sub)
{
_consumer = new UnwrappingWrappingConsumer(sub, this);
_underlying.consumerAdded(_consumer);
}
@Override
public ListenableFuture<Void> consumerRemoved(final MessageInstanceConsumer<WrappingTarget<T>> sub)
{
return _underlying.consumerRemoved(_consumer);
}
@Override
public long getUnacknowledgedBytes()
{
return _underlying.getUnacknowledgedBytes();
}
@Override
public long getUnacknowledgedMessages()
{
return _underlying.getUnacknowledgedMessages();
}
@Override
public AMQPSession getSession()
{
return _underlying.getSession();
}
@Override
public void send(final MessageInstanceConsumer consumer,
final MessageInstance entry,
final boolean batch)
{
_underlying.send(_consumer, entry, batch);
}
@Override
public boolean sendNextMessage()
{
return _underlying.sendNextMessage();
}
@Override
public void flushBatched()
{
_underlying.flushBatched();
}
@Override
public void noMessagesAvailable()
{
_underlying.noMessagesAvailable();
}
@Override
public boolean allocateCredit(final ServerMessage msg)
{
return _underlying.allocateCredit(msg);
}
@Override
public void restoreCredit(final ServerMessage queueEntry)
{
_underlying.restoreCredit(queueEntry);
}
@Override
public boolean isSuspended()
{
return _underlying.isSuspended();
}
@Override
public boolean close()
{
_managementAddressSpace.removeProxyMessageSource(_connectionReference, _name);
ProxyMessageSource.this._consumer = null;
return _underlying.close();
}
@Override
public void queueDeleted(final Queue queue, final MessageInstanceConsumer sub)
{
_underlying.queueDeleted(queue, _consumer);
}
}
private static class UnwrappingWrappingConsumer<T extends ConsumerTarget<T>> implements MessageInstanceConsumer<T>
{
private final MessageInstanceConsumer<WrappingTarget<T>> _underlying;
private final WrappingTarget<T> _target;
public UnwrappingWrappingConsumer(final MessageInstanceConsumer<WrappingTarget<T>> sub, WrappingTarget<T> wrappedTarget)
{
_underlying = sub;
_target = wrappedTarget;
}
@Override
public boolean isClosed()
{
return _underlying.isClosed();
}
@Override
public boolean acquires()
{
return _underlying.acquires();
}
@Override
public String getName()
{
return _underlying.getName();
}
@Override
public void close()
{
_underlying.close();
}
@Override
public void externalStateChange()
{
_underlying.externalStateChange();
}
@Override
public Object getIdentifier()
{
return _underlying.getIdentifier();
}
@Override
public MessageContainer pullMessage()
{
return _underlying.pullMessage();
}
@Override
public T getTarget()
{
return _target.getUnderlying();
}
@Override
public void setNotifyWorkDesired(final boolean desired)
{
_underlying.setNotifyWorkDesired(desired);
}
}
}