blob: fe66a6d341f3020645ad89125543a814cab747ca [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.binding;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.BindingConfig;
import org.apache.qpid.server.configuration.BindingConfigType;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BindingMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class BindingFactory
{
private final VirtualHost _virtualHost;
private final DurableConfigurationStore.Source _configSource;
private final Exchange _defaultExchange;
private final ConcurrentHashMap<BindingImpl, BindingImpl> _bindings = new ConcurrentHashMap<BindingImpl, BindingImpl>();
public BindingFactory(final VirtualHost vhost)
{
this(vhost, vhost.getExchangeRegistry().getDefaultExchange());
}
public BindingFactory(final DurableConfigurationStore.Source configSource, final Exchange defaultExchange)
{
_configSource = configSource;
_defaultExchange = defaultExchange;
if (configSource instanceof VirtualHost)
{
_virtualHost = (VirtualHost) configSource;
}
else
{
_virtualHost = null;
}
}
public VirtualHost getVirtualHost()
{
return _virtualHost;
}
private final class BindingImpl extends Binding implements AMQQueue.Task, Exchange.Task, BindingConfig
{
private final BindingLogSubject _logSubject;
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
private BindingImpl(String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
{
super(queue.getVirtualHost().getConfigStore().createId(), bindingKey, queue, exchange, arguments);
_logSubject = new BindingLogSubject(bindingKey,exchange,queue);
}
public void doTask(final AMQQueue queue) throws AMQException
{
removeBinding(this);
}
public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException
{
removeBinding(this);
}
void logCreation()
{
CurrentActor.get().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()), getArguments() != null && !getArguments().isEmpty()));
}
void logDestruction()
{
CurrentActor.get().message(_logSubject, BindingMessages.DELETED());
}
public String getOrigin()
{
return (String) getArguments().get("qpid.fed.origin");
}
public long getCreateTime()
{
return _createTime;
}
public BindingConfigType getConfigType()
{
return BindingConfigType.getInstance();
}
public ConfiguredObject getParent()
{
return _virtualHost;
}
public boolean isDurable()
{
return getQueue().isDurable() && getExchange().isDurable();
}
}
public boolean addBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
{
return makeBinding(bindingKey, queue, exchange, arguments, false, false);
}
public boolean replaceBinding(final String bindingKey,
final AMQQueue queue,
final Exchange exchange,
final Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
{
return makeBinding(bindingKey, queue, exchange, arguments, false, true);
}
private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException
{
assert queue != null;
if (bindingKey == null)
{
bindingKey = "";
}
if (exchange == null)
{
exchange = _defaultExchange;
}
if (arguments == null)
{
arguments = Collections.emptyMap();
}
// The default exchange bindings must reflect the existence of queues, allow
// all operations on it to succeed. It is up to the broker to prevent illegal
// attempts at binding to this exchange, not the ACLs.
if(exchange != _defaultExchange)
{
//Perform ACLs
if (!getVirtualHost().getSecurityManager().authoriseBind(exchange, queue, new AMQShortString(bindingKey)))
{
throw new AMQSecurityException("Permission denied: binding " + bindingKey);
}
}
BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments);
BindingImpl existingMapping = _bindings.putIfAbsent(b,b);
if (existingMapping == null || force)
{
if (existingMapping != null)
{
removeBinding(existingMapping);
}
if (b.isDurable() && !restore)
{
_configSource.getDurableConfigurationStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments));
}
queue.addQueueDeleteTask(b);
exchange.addCloseTask(b);
queue.addBinding(b);
exchange.addBinding(b);
getConfigStore().addConfiguredObject(b);
b.logCreation();
return true;
}
else
{
return false;
}
}
private ConfigStore getConfigStore()
{
return getVirtualHost().getConfigStore();
}
public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException
{
makeBinding(bindingKey,queue,exchange,argumentMap,true, false);
}
public void removeBinding(final Binding b) throws AMQSecurityException, AMQInternalException
{
removeBinding(b.getBindingKey(), b.getQueue(), b.getExchange(), b.getArguments());
}
public Binding removeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
{
assert queue != null;
if (bindingKey == null)
{
bindingKey = "";
}
if (exchange == null)
{
exchange = _defaultExchange;
}
if (arguments == null)
{
arguments = Collections.emptyMap();
}
// The default exchange bindings must reflect the existence of queues, allow
// all operations on it to succeed. It is up to the broker to prevent illegal
// attempts at binding to this exchange, not the ACLs.
if(exchange != _defaultExchange)
{
// Check access
if (!getVirtualHost().getSecurityManager().authoriseUnbind(exchange, new AMQShortString(bindingKey), queue))
{
throw new AMQSecurityException("Permission denied: unbinding " + bindingKey);
}
}
BindingImpl b = _bindings.remove(new BindingImpl(bindingKey,queue,exchange,arguments));
if (b != null)
{
exchange.removeBinding(b);
queue.removeBinding(b);
exchange.removeCloseTask(b);
queue.removeQueueDeleteTask(b);
if (b.isDurable())
{
_configSource.getDurableConfigurationStore().unbindQueue(exchange,
new AMQShortString(bindingKey),
queue,
FieldTable.convertToFieldTable(arguments));
}
b.logDestruction();
getConfigStore().removeConfiguredObject(b);
}
return b;
}
public Binding getBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments)
{
assert queue != null;
if(bindingKey == null)
{
bindingKey = "";
}
if(exchange == null)
{
exchange = _defaultExchange;
}
if(arguments == null)
{
arguments = Collections.emptyMap();
}
BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments);
return _bindings.get(b);
}
}