blob: 4188e1a3abbe7bb177a262fb9f1e1e4e23e62852 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.server.virtualhost;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerConnectionLimitProvider;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostConnectionLimitProvider;
import org.apache.qpid.server.security.limit.CachedConnectionLimiterImpl;
import org.apache.qpid.server.security.limit.ConnectionLimitProvider;
import org.apache.qpid.server.security.limit.ConnectionLimiter;
import org.apache.qpid.server.security.limit.ConnectionLimiter.CachedLimiter;
final class VirtualHostConnectionLimiter extends CachedConnectionLimiterImpl implements CachedLimiter
{
private static final Logger LOGGER = LoggerFactory.getLogger(VirtualHostConnectionLimiter.class);
private final VirtualHost<?> _virtualHost;
private final Broker<?> _broker;
private final Map<ConnectionLimitProvider<?>, ConnectionLimiter> _connectionLimitProviders = new ConcurrentHashMap<>();
private final ChangeListener _virtualHostChangeListener;
private final ChangeListener _brokerChangeListener;
VirtualHostConnectionLimiter(VirtualHost<?> virtualHost, Broker<?> broker)
{
super(ConnectionLimiter.noLimits());
_virtualHost = Objects.requireNonNull(virtualHost);
_broker = Objects.requireNonNull(broker);
_virtualHostChangeListener = ChangeListener.virtualHostChangeListener(this);
_brokerChangeListener = ChangeListener.brokerChangeListener(this);
}
public void open()
{
_virtualHost.addChangeListener(_virtualHostChangeListener);
_broker.addChangeListener(_brokerChangeListener);
_virtualHost.getChildren(VirtualHostConnectionLimitProvider.class)
.forEach(child -> child.addChangeListener(ProviderChangeListener.virtualHostChangeListener(this)));
_broker.getChildren(BrokerConnectionLimitProvider.class)
.forEach(child -> child.addChangeListener(ProviderChangeListener.brokerChangeListener(this)));
}
public void activate()
{
update();
}
public void close()
{
_virtualHost.removeChangeListener(_virtualHostChangeListener);
_broker.removeChangeListener(_brokerChangeListener);
final ProviderChangeListener virtualHostChangeListener = ProviderChangeListener.virtualHostChangeListener(this);
_virtualHost.getChildren(VirtualHostConnectionLimitProvider.class)
.forEach(child -> child.removeChangeListener(virtualHostChangeListener));
final ProviderChangeListener brokerChangeListener = ProviderChangeListener.brokerChangeListener(this);
_broker.getChildren(BrokerConnectionLimitProvider.class)
.forEach(child -> child.removeChangeListener(brokerChangeListener));
swapLimiter(ConnectionLimiter.noLimits());
}
private void update(ConfiguredObject<?> object)
{
_connectionLimitProviders.remove(object);
update();
}
private void update()
{
if (!((SystemConfig<?>) _broker.getParent()).isManagementMode())
{
swapLimiter(newLimiter(_connectionLimitProviders));
}
}
private ConnectionLimiter newLimiter(final Map<ConnectionLimitProvider<?>, ConnectionLimiter> cache)
{
ConnectionLimiter limiter = ConnectionLimiter.noLimits();
LOGGER.debug("Updating virtual host connection limiters");
for (final VirtualHostConnectionLimitProvider<?> provider :
_virtualHost.getChildren(VirtualHostConnectionLimitProvider.class))
{
if (provider.getState() == State.ACTIVE)
{
limiter = limiter.append(
cache.computeIfAbsent(provider, ConnectionLimitProvider::getConnectionLimiter));
}
else if (provider.getState() == State.ERRORED)
{
limiter = ConnectionLimiter.blockEveryone();
}
}
LOGGER.debug("Updating broker connection limiters");
for (final BrokerConnectionLimitProvider<?> provider :
_broker.getChildren(BrokerConnectionLimitProvider.class))
{
if (provider.getState() == State.ACTIVE)
{
limiter = limiter.append(
cache.computeIfAbsent(provider, ConnectionLimitProvider::getConnectionLimiter));
}
else if (provider.getState() == State.ERRORED)
{
limiter = ConnectionLimiter.blockEveryone();
}
}
return limiter;
}
private abstract static class AbstractChangeListener extends AbstractConfigurationChangeListener
{
final VirtualHostConnectionLimiter _limiter;
final Class<?> _providerClazz;
AbstractChangeListener(VirtualHostConnectionLimiter limiter, Class<?> providerClazz)
{
super();
_limiter = Objects.requireNonNull(limiter);
_providerClazz = Objects.requireNonNull(providerClazz);
}
void addProvider(ConfiguredObject<?> provider)
{
provider.addChangeListener(new ProviderChangeListener(_limiter, _providerClazz));
_limiter.update();
}
void removeProvider(ConfiguredObject<?> provider)
{
provider.removeChangeListener(new ProviderChangeListener(_limiter, _providerClazz));
_limiter.update(provider);
}
void updateProvider(ConfiguredObject<?> provider)
{
_limiter.update(provider);
}
@Override
public int hashCode()
{
return 31 * _limiter.hashCode() + _providerClazz.hashCode();
}
@Override
public boolean equals(Object obj)
{
if (obj instanceof AbstractChangeListener)
{
final AbstractChangeListener changeListener = (AbstractChangeListener) obj;
return _limiter == changeListener._limiter && _providerClazz == changeListener._providerClazz;
}
return false;
}
}
private static final class ChangeListener extends AbstractChangeListener
{
private final Class<?> _categoryClass;
static ChangeListener virtualHostChangeListener(VirtualHostConnectionLimiter limiter)
{
return new ChangeListener(limiter, VirtualHost.class, VirtualHostConnectionLimitProvider.class);
}
static ChangeListener brokerChangeListener(VirtualHostConnectionLimiter limiter)
{
return new ChangeListener(limiter, Broker.class, BrokerConnectionLimitProvider.class);
}
private ChangeListener(VirtualHostConnectionLimiter limiter,
Class<?> categoryClass, Class<?> childCategoryClass)
{
super(limiter, childCategoryClass);
_categoryClass = categoryClass;
}
@Override
public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
{
super.childAdded(object, child);
if (object.getCategoryClass() == _categoryClass && child.getCategoryClass() == _providerClazz)
{
addProvider(child);
}
}
@Override
public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
{
super.childRemoved(object, child);
if (object.getCategoryClass() == _categoryClass && child.getCategoryClass() == _providerClazz)
{
removeProvider(child);
}
}
}
private static final class ProviderChangeListener extends AbstractChangeListener
{
private final Map<ConfiguredObject<?>, Boolean> _bulkChanges = new ConcurrentHashMap<>();
static ProviderChangeListener virtualHostChangeListener(VirtualHostConnectionLimiter limiter)
{
return new ProviderChangeListener(limiter, VirtualHostConnectionLimitProvider.class);
}
static ProviderChangeListener brokerChangeListener(VirtualHostConnectionLimiter limiter)
{
return new ProviderChangeListener(limiter, BrokerConnectionLimitProvider.class);
}
ProviderChangeListener(VirtualHostConnectionLimiter limiter, Class<?> clazz)
{
super(limiter, clazz);
}
@Override
public void attributeSet(final ConfiguredObject<?> object,
final String attributeName,
final Object oldAttributeValue,
final Object newAttributeValue)
{
super.attributeSet(object, attributeName, oldAttributeValue, newAttributeValue);
if (object.getCategoryClass() == _providerClazz && !_bulkChanges.containsKey(object))
{
updateProvider(object);
}
}
@Override
public void bulkChangeStart(final ConfiguredObject<?> object)
{
super.bulkChangeStart(object);
_bulkChanges.put(object, Boolean.TRUE);
}
@Override
public void bulkChangeEnd(final ConfiguredObject<?> object)
{
super.bulkChangeEnd(object);
if (Optional.ofNullable(_bulkChanges.remove(object)).orElse(Boolean.FALSE))
{
updateProvider(object);
}
}
}
}