blob: 56e28bd870d29bccf2823c2f87ccbb6c2938a3ca [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.virtualhost;
import java.security.AccessControlException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.connection.ConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.DefaultDestination;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueConsumer;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.GenericRecoverer;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreProvider;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.MapValueConverter;
public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X>
implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, IConnectionRegistry.RegistryChangeListener, EventListener
{
private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery";
public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
public static final String DLQ_ROUTING_KEY = "dlq";
public static final String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change
private static final int MAX_LENGTH = 255;
private static final Logger _logger = Logger.getLogger(AbstractVirtualHost.class);
private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
private ScheduledThreadPoolExecutor _houseKeepingTasks;
private final Broker<?> _broker;
private final ConnectionRegistry _connectionRegistry;
private final DtxRegistry _dtxRegistry;
private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
private boolean _blocked;
private final Map<String, MessageDestination> _systemNodeDestinations =
Collections.synchronizedMap(new HashMap<String,MessageDestination>());
private final Map<String, MessageSource> _systemNodeSources =
Collections.synchronizedMap(new HashMap<String,MessageSource>());
private final EventLogger _eventLogger;
private final List<VirtualHostAlias> _aliases = new ArrayList<VirtualHostAlias>();
private final AtomicBoolean _deleted = new AtomicBoolean();
private final VirtualHostNode<?> _virtualHostNode;
private final AtomicLong _targetSize = new AtomicLong(1024*1024);
private MessageStoreLogSubject _messageStoreLogSubject;
@ManagedAttributeField
private boolean _queue_deadLetterQueueEnabled;
@ManagedAttributeField
private long _housekeepingCheckPeriod;
@ManagedAttributeField
private long _storeTransactionIdleTimeoutClose;
@ManagedAttributeField
private long _storeTransactionIdleTimeoutWarn;
@ManagedAttributeField
private long _storeTransactionOpenTimeoutClose;
@ManagedAttributeField
private long _storeTransactionOpenTimeoutWarn;
@ManagedAttributeField
private int _housekeepingThreadCount;
private boolean _useAsyncRecoverer;
private MessageDestination _defaultDestination;
private MessageStore _messageStore;
public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
super(parentsMap(virtualHostNode), attributes);
_broker = virtualHostNode.getParent(Broker.class);
_virtualHostNode = virtualHostNode;
_dtxRegistry = new DtxRegistry();
_eventLogger = _broker.getParent(SystemConfig.class).getEventLogger();
_eventLogger.message(VirtualHostMessages.CREATED(getName()));
_connectionRegistry = new ConnectionRegistry();
_connectionRegistry.addRegistryChangeListener(this);
_defaultDestination = new DefaultDestination(this);
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
_dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
_messagesReceived = new StatisticsCounter("messages-received-" + getName());
_dataReceived = new StatisticsCounter("bytes-received-" + getName());
}
public void onValidate()
{
super.onValidate();
String name = getName();
if (name == null || "".equals(name.trim()))
{
throw new IllegalConfigurationException("Virtual host name must be specified");
}
String type = getType();
if (type == null || "".equals(type.trim()))
{
throw new IllegalConfigurationException("Virtual host type must be specified");
}
if(!isDurable())
{
throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable");
}
}
@Override
protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
{
super.validateChange(proxyForValidation, changedAttributes);
if(changedAttributes.contains(DURABLE) && !proxyForValidation.isDurable())
{
throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable");
}
if (changedAttributes.contains(DESIRED_STATE))
{
if (State.DELETED == proxyForValidation.getDesiredState()
&& getName().equals(_broker.getDefaultVirtualHost()))
{
throw new IntegrityViolationException("Cannot delete default virtual host '" + getName() + "'");
}
}
}
@Override
public MessageStore getMessageStore()
{
return _messageStore;
}
@Override
public void validateOnCreate()
{
super.validateOnCreate();
validateMessageStoreCreation();
}
private void validateMessageStoreCreation()
{
MessageStore store = createMessageStore();
if (store != null)
{
try
{
store.openMessageStore(this);
}
catch (Exception e)
{
throw new IllegalConfigurationException("Cannot open virtual host message store:" + e.getMessage(), e);
}
finally
{
try
{
store.closeMessageStore();
}
catch(Exception e)
{
_logger.warn("Failed to close database", e);
}
}
}
}
@Override
protected void onExceptionInOpen(RuntimeException e)
{
super.onExceptionInOpen(e);
closeMessageStore();
}
@Override
protected void onOpen()
{
super.onOpen();
registerSystemNodes();
_messageStore = createMessageStore();
_messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
_messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
_messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
addChangeListener(new StoreUpdatingChangeListener());
}
private void checkVHostStateIsActive()
{
if (getState() != State.ACTIVE)
{
throw new IllegalStateException("The virtual host state of " + getState()
+ " does not permit this operation.");
}
}
private void registerSystemNodes()
{
QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
Iterable<SystemNodeCreator> factories = qpidServiceLoader.instancesOf(SystemNodeCreator.class);
for(SystemNodeCreator creator : factories)
{
creator.register(_systemNodeRegistry);
}
}
protected abstract MessageStore createMessageStore();
protected boolean isStoreEmpty()
{
final IsStoreEmptyHandler isStoreEmptyHandler = new IsStoreEmptyHandler();
getDurableConfigurationStore().visitConfiguredObjectRecords(isStoreEmptyHandler);
return isStoreEmptyHandler.isEmpty();
}
protected void createDefaultExchanges()
{
Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<Void>()
{
@Override
public Void run()
{
addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
return null;
}
void addStandardExchange(String name, String type)
{
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Exchange.NAME, name);
attributes.put(Exchange.TYPE, type);
attributes.put(Exchange.ID, UUIDGenerator.generateExchangeUUID(name, getName()));
childAdded(addExchange(attributes));
}
});
}
protected MessageStoreLogSubject getMessageStoreLogSubject()
{
return _messageStoreLogSubject;
}
public IConnectionRegistry getConnectionRegistry()
{
return _connectionRegistry;
}
@Override
protected void authoriseSetDesiredState(State desiredState) throws AccessControlException
{
if(desiredState == State.DELETED)
{
_broker.getSecurityManager().authoriseVirtualHost(getName(), Operation.DELETE);
}
else
{
_broker.getSecurityManager().authoriseVirtualHost(getName(), Operation.UPDATE);
}
}
@Override
protected void authoriseSetAttributes(ConfiguredObject<?> modified, Set<String> attributes) throws AccessControlException
{
_broker.getSecurityManager().authoriseVirtualHost(getName(), Operation.UPDATE);
}
public Collection<Connection> getConnections()
{
return getChildren(Connection.class);
}
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
if(clazz == VirtualHostAlias.class)
{
return (Collection<C>) getAliases();
}
else
{
return super.getChildren(clazz);
}
}
@Override
protected <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
checkVHostStateIsActive();
if(childClass == Exchange.class)
{
return (C) addExchange(attributes);
}
else if(childClass == Queue.class)
{
return (C) addQueue(attributes);
}
else if(childClass == VirtualHostAlias.class)
{
throw new UnsupportedOperationException();
}
else if(childClass == Connection.class)
{
throw new UnsupportedOperationException();
}
throw new IllegalArgumentException("Cannot create a child of class " + childClass.getSimpleName());
}
public Collection<String> getExchangeTypeNames()
{
return getObjectFactory().getSupportedTypes(Exchange.class);
}
@Override
public EventLogger getEventLogger()
{
return _eventLogger;
}
/**
* Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers
* and checking for idle or open transactions that have exceeded the permitted thresholds.
*
* @param period
*/
private void initialiseHouseKeeping(long period)
{
if (period != 0L)
{
scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask());
}
}
protected void shutdownHouseKeeping()
{
if(_houseKeepingTasks != null)
{
_houseKeepingTasks.shutdown();
try
{
if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS))
{
_houseKeepingTasks.shutdownNow();
}
}
catch (InterruptedException e)
{
_logger.warn("Interrupted during Housekeeping shutdown:", e);
Thread.currentThread().interrupt();
}
}
}
protected void removeHouseKeepingTasks()
{
BlockingQueue<Runnable> taskQueue = _houseKeepingTasks.getQueue();
for (final Runnable runnable : taskQueue)
{
_houseKeepingTasks.remove(runnable);
}
}
/**
* Allow other broker components to register a HouseKeepingTask
*
* @param period How often this task should run, in ms.
* @param task The task to run.
*/
public void scheduleHouseKeepingTask(long period, HouseKeepingTask task)
{
_houseKeepingTasks.scheduleAtFixedRate(task, period / 2, period,
TimeUnit.MILLISECONDS);
}
public ScheduledFuture<?> scheduleTask(long delay, Runnable task)
{
return _houseKeepingTasks.schedule(task, delay, TimeUnit.MILLISECONDS);
}
public long getHouseKeepingTaskCount()
{
return _houseKeepingTasks.getTaskCount();
}
public long getHouseKeepingCompletedTaskCount()
{
return _houseKeepingTasks.getCompletedTaskCount();
}
public int getHouseKeepingPoolSize()
{
return _houseKeepingTasks.getCorePoolSize();
}
public void setHouseKeepingPoolSize(int newSize)
{
_houseKeepingTasks.setCorePoolSize(newSize);
}
public int getHouseKeepingActiveCount()
{
return _houseKeepingTasks.getActiveCount();
}
@Override
public AMQQueue<?> getQueue(String name)
{
return (AMQQueue<?>) getChildByName(Queue.class, name);
}
@Override
public MessageSource getMessageSource(final String name)
{
MessageSource systemSource = _systemNodeSources.get(name);
return systemSource == null ? getQueue(name) : systemSource;
}
@Override
public AMQQueue<?> getQueue(UUID id)
{
return (AMQQueue<?>) getChildById(Queue.class, id);
}
@Override
public Collection<AMQQueue<?>> getQueues()
{
Collection children = getChildren(Queue.class);
return children;
}
@Override
public int removeQueue(AMQQueue<?> queue)
{
int purged = queue.deleteAndReturnCount();
if (queue.isDurable() && !(queue.getLifetimePolicy()
== LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
|| queue.getLifetimePolicy()
== LifetimePolicy.DELETE_ON_SESSION_END))
{
DurableConfigurationStore store = getDurableConfigurationStore();
store.remove(queue.asObjectRecord());
}
return purged;
}
public AMQQueue<?> createQueue(Map<String, Object> attributes) throws QueueExistsException
{
checkVHostStateIsActive();
AMQQueue<?> queue = addQueue(attributes);
childAdded(queue);
return queue;
}
private AMQQueue<?> addQueue(Map<String, Object> attributes) throws QueueExistsException
{
if (shouldCreateDLQ(attributes))
{
// TODO - this isn't really correct - what if the name has ${foo} in it?
String queueName = String.valueOf(attributes.get(Queue.NAME));
validateDLNames(queueName);
String altExchangeName = createDLQ(queueName);
attributes = new LinkedHashMap<String, Object>(attributes);
attributes.put(Queue.ALTERNATE_EXCHANGE, altExchangeName);
}
return addQueueWithoutDLQ(attributes);
}
private AMQQueue<?> addQueueWithoutDLQ(Map<String, Object> attributes) throws QueueExistsException
{
try
{
return (AMQQueue) getObjectFactory().create(Queue.class, attributes, this);
}
catch (DuplicateNameException e)
{
throw new QueueExistsException(getQueue(e.getName()));
}
}
@Override
public MessageDestination getMessageDestination(final String name)
{
MessageDestination destination = _systemNodeDestinations.get(name);
return destination == null ? getExchange(name) : destination;
}
@Override
public ExchangeImpl getExchange(String name)
{
return getChildByName(ExchangeImpl.class,name);
}
@Override
public ExchangeImpl getExchange(UUID id)
{
return getChildById(ExchangeImpl.class, id);
}
@Override
public MessageDestination getDefaultDestination()
{
return _defaultDestination;
}
@Override
public Collection<ExchangeImpl<?>> getExchanges()
{
Collection children = getChildren(Exchange.class);
return children;
}
@Override
public ExchangeImpl createExchange(Map<String,Object> attributes)
throws ExchangeExistsException, ReservedExchangeNameException,
NoFactoryForTypeException
{
checkVHostStateIsActive();
ExchangeImpl child = addExchange(attributes);
childAdded(child);
return child;
}
private ExchangeImpl addExchange(Map<String,Object> attributes)
throws ExchangeExistsException, ReservedExchangeNameException,
NoFactoryForTypeException
{
try
{
return (ExchangeImpl) getObjectFactory().create(Exchange.class, attributes, this);
}
catch (DuplicateNameException e)
{
throw new ExchangeExistsException(getExchange(e.getName()));
}
}
@Override
public void removeExchange(ExchangeImpl exchange, boolean force)
throws ExchangeIsAlternateException, RequiredExchangeException
{
exchange.deleteWithChecks();
}
public SecurityManager getSecurityManager()
{
return _broker.getSecurityManager();
}
protected void onClose()
{
setState(State.UNAVAILABLE);
//Stop Connections
_connectionRegistry.close();
_dtxRegistry.close();
closeMessageStore();
shutdownHouseKeeping();
_eventLogger.message(VirtualHostMessages.CLOSED(getName()));
}
private void closeMessageStore()
{
if (getMessageStore() != null)
{
try
{
getMessageStore().closeMessageStore();
}
catch (StoreException e)
{
_logger.error("Failed to close message store", e);
}
if (!(_virtualHostNode.getConfigurationStore() instanceof MessageStoreProvider))
{
getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CLOSED());
}
}
}
public void registerMessageDelivered(long messageSize)
{
_messagesDelivered.registerEvent(1L);
_dataDelivered.registerEvent(messageSize);
_broker.registerMessageDelivered(messageSize);
}
public void registerMessageReceived(long messageSize, long timestamp)
{
_messagesReceived.registerEvent(1L, timestamp);
_dataReceived.registerEvent(messageSize, timestamp);
_broker.registerMessageReceived(messageSize, timestamp);
}
public StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
public StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
public StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
public StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
public void resetStatistics()
{
_messagesDelivered.reset();
_dataDelivered.reset();
_messagesReceived.reset();
_dataReceived.reset();
for (AMQConnectionModel connection : _connectionRegistry.getConnections())
{
connection.resetStatistics();
}
}
public synchronized LinkRegistry getLinkRegistry(String remoteContainerId)
{
LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId);
if(linkRegistry == null)
{
linkRegistry = new LinkRegistry();
_linkRegistry.put(remoteContainerId, linkRegistry);
}
return linkRegistry;
}
public DtxRegistry getDtxRegistry()
{
return _dtxRegistry;
}
public void block()
{
synchronized (_connectionRegistry)
{
if(!_blocked)
{
_blocked = true;
for(AMQConnectionModel conn : _connectionRegistry.getConnections())
{
conn.block();
}
}
}
}
public void unblock()
{
synchronized (_connectionRegistry)
{
if(_blocked)
{
_blocked = false;
for(AMQConnectionModel conn : _connectionRegistry.getConnections())
{
conn.unblock();
}
}
}
}
public void connectionRegistered(final AMQConnectionModel connection)
{
if(_blocked)
{
connection.block();
}
ConnectionAdapter c = new ConnectionAdapter(connection);
c.create();
childAdded(c);
}
public void connectionUnregistered(final AMQConnectionModel connection)
{
// ConnectionAdapter installs delete task to cause connection model object to delete
}
public void event(final Event event)
{
switch(event)
{
case PERSISTENT_MESSAGE_SIZE_OVERFULL:
block();
_eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.OVERFULL());
break;
case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
unblock();
_eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.UNDERFULL());
break;
}
}
protected void reportIfError(State state)
{
if (state == State.ERRORED)
{
_eventLogger.message(VirtualHostMessages.ERRORED(getName()));
}
}
private static class IsStoreEmptyHandler implements ConfiguredObjectRecordHandler
{
private boolean _empty = true;
@Override
public void begin()
{
}
@Override
public boolean handle(final ConfiguredObjectRecord record)
{
// if there is a non vhost record then the store is not empty and we can stop looking at the records
_empty = record.getType().equals(VirtualHost.class.getSimpleName());
return _empty;
}
@Override
public void end()
{
}
public boolean isEmpty()
{
return _empty;
}
}
private class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
public VirtualHostHouseKeepingTask()
{
super(AbstractVirtualHost.this);
}
public void execute()
{
VirtualHostNode<?> virtualHostNode = getParent(VirtualHostNode.class);
Broker<?> broker = virtualHostNode.getParent(Broker.class);
broker.assignTargetSizes();
for (AMQQueue<?> q : getQueues())
{
if (q.getState() == State.ACTIVE)
{
if (_logger.isDebugEnabled())
{
_logger.debug("Checking message status for queue: "
+ q.getName());
}
try
{
q.checkMessageStatus();
}
catch (Exception e)
{
_logger.error("Exception in housekeeping for queue: " + q.getName(), e);
//Don't throw exceptions as this will stop the
// house keeping task from running.
}
}
}
for (AMQConnectionModel<?,?> connection : getConnectionRegistry().getConnections())
{
if (_logger.isDebugEnabled())
{
_logger.debug("Checking for long running open transactions on connection " + connection);
}
for (AMQSessionModel<?,?> session : connection.getSessionModels())
{
if (_logger.isDebugEnabled())
{
_logger.debug("Checking for long running open transactions on session " + session);
}
try
{
session.checkTransactionStatus(getStoreTransactionOpenTimeoutWarn(),
getStoreTransactionOpenTimeoutClose(),
getStoreTransactionIdleTimeoutWarn(),
getStoreTransactionIdleTimeoutClose());
} catch (Exception e)
{
_logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
}
}
}
}
}
private class SystemNodeRegistry implements SystemNodeCreator.SystemNodeRegistry
{
@Override
public void registerSystemNode(final MessageNode node)
{
if(node instanceof MessageDestination)
{
_systemNodeDestinations.put(node.getName(), (MessageDestination) node);
}
if(node instanceof MessageSource)
{
_systemNodeSources.put(node.getName(), (MessageSource)node);
}
}
@Override
public void removeSystemNode(final MessageNode node)
{
if(node instanceof MessageDestination)
{
_systemNodeDestinations.remove(node.getName());
}
if(node instanceof MessageSource)
{
_systemNodeSources.remove(node.getName());
}
}
@Override
public VirtualHostImpl getVirtualHost()
{
return AbstractVirtualHost.this;
}
}
@Override
public boolean getDefaultDeadLetterQueueEnabled()
{
return isQueue_deadLetterQueueEnabled();
}
public void executeTransaction(TransactionalOperation op)
{
MessageStore store = getMessageStore();
final LocalTransaction txn = new LocalTransaction(store);
op.withinTransaction(new Transaction()
{
public void dequeue(final MessageInstance messageInstance)
{
boolean acquired = messageInstance.acquire();
if(!acquired && messageInstance instanceof QueueEntry)
{
QueueEntry entry = (QueueEntry) messageInstance;
QueueConsumer consumer = (QueueConsumer) entry.getDeliveredConsumer();
acquired = messageInstance.removeAcquisitionFromConsumer(consumer);
if(acquired)
{
consumer.acquisitionRemoved((QueueEntry)messageInstance);
}
}
if(acquired)
{
txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action()
{
public void postCommit()
{
messageInstance.delete();
}
public void onRollback()
{
}
});
}
}
public void copy(MessageInstance entry, Queue queue)
{
final ServerMessage message = entry.getMessage();
final AMQQueue toQueue = (AMQQueue)queue;
txn.enqueue(toQueue, message, new ServerTransaction.Action()
{
public void postCommit()
{
toQueue.enqueue(message, null);
}
public void onRollback()
{
}
});
}
public void move(final MessageInstance entry, Queue queue)
{
final ServerMessage message = entry.getMessage();
final AMQQueue toQueue = (AMQQueue)queue;
if(entry.acquire())
{
txn.enqueue(toQueue, message,
new ServerTransaction.Action()
{
public void postCommit()
{
toQueue.enqueue(message, null);
}
public void onRollback()
{
entry.release();
}
});
txn.dequeue(entry.getOwningResource(), message,
new ServerTransaction.Action()
{
public void postCommit()
{
entry.delete();
}
public void onRollback()
{
}
});
}
}
});
txn.commit();
}
@Override
public Collection<String> getSupportedExchangeTypes()
{
return getObjectFactory().getSupportedTypes(Exchange.class);
}
@Override
public Collection<String> getSupportedQueueTypes()
{
return getObjectFactory().getSupportedTypes(Queue.class);
}
@Override
public boolean isQueue_deadLetterQueueEnabled()
{
return _queue_deadLetterQueueEnabled;
}
@Override
public long getHousekeepingCheckPeriod()
{
return _housekeepingCheckPeriod;
}
@Override
public long getStoreTransactionIdleTimeoutClose()
{
return _storeTransactionIdleTimeoutClose;
}
@Override
public long getStoreTransactionIdleTimeoutWarn()
{
return _storeTransactionIdleTimeoutWarn;
}
@Override
public long getStoreTransactionOpenTimeoutClose()
{
return _storeTransactionOpenTimeoutClose;
}
@Override
public long getStoreTransactionOpenTimeoutWarn()
{
return _storeTransactionOpenTimeoutWarn;
}
@Override
public long getQueueCount()
{
return getQueues().size();
}
@Override
public long getExchangeCount()
{
return getExchanges().size();
}
@Override
public long getConnectionCount()
{
return getConnectionRegistry().getConnections().size();
}
@Override
public long getBytesIn()
{
return getDataReceiptStatistics().getTotal();
}
@Override
public long getBytesOut()
{
return getDataDeliveryStatistics().getTotal();
}
@Override
public long getMessagesIn()
{
return getMessageReceiptStatistics().getTotal();
}
@Override
public long getMessagesOut()
{
return getMessageDeliveryStatistics().getTotal();
}
@Override
public int getHousekeepingThreadCount()
{
return _housekeepingThreadCount;
}
@StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
protected void doStop()
{
closeChildren();
shutdownHouseKeeping();
closeMessageStore();
setState(State.STOPPED);
}
@StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED )
private void doDelete()
{
if(_deleted.compareAndSet(false,true))
{
String hostName = getName();
close();
MessageStore ms = getMessageStore();
if (ms != null)
{
try
{
ms.onDelete(this);
}
catch (Exception e)
{
_logger.warn("Exception occurred on message store deletion", e);
}
}
deleted();
setState(State.DELETED);
}
}
public Collection<VirtualHostAlias> getAliases()
{
return Collections.unmodifiableCollection(_aliases);
}
private String createDLQ(final String queueName)
{
final String dlExchangeName = getDeadLetterExchangeName(queueName);
final String dlQueueName = getDeadLetterQueueName(queueName);
ExchangeImpl dlExchange = null;
final UUID dlExchangeId = UUID.randomUUID();
try
{
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(org.apache.qpid.server.model.Exchange.ID, dlExchangeId);
attributes.put(org.apache.qpid.server.model.Exchange.NAME, dlExchangeName);
attributes.put(org.apache.qpid.server.model.Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
dlExchange = createExchange(attributes);
}
catch(ExchangeExistsException e)
{
// We're ok if the exchange already exists
dlExchange = e.getExistingExchange();
}
catch (ReservedExchangeNameException | NoFactoryForTypeException | UnknownConfiguredObjectException e)
{
throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
}
AMQQueue dlQueue = null;
{
dlQueue = getQueue(dlQueueName);
if(dlQueue == null)
{
//set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc
final Map<String, Object> args = new HashMap<String, Object>();
args.put(CREATE_DLQ_ON_CREATION, false);
args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0);
try
{
args.put(Queue.ID, UUID.randomUUID());
args.put(Queue.NAME, dlQueueName);
args.put(Queue.DURABLE, true);
dlQueue = addQueueWithoutDLQ(args);
childAdded(dlQueue);
}
catch (QueueExistsException e)
{
// TODO - currently theoretically for two threads to be creating a queue at the same time.
// All model changing operations should be moved to the task executor of the virtual host
}
}
}
//ensure the queue is bound to the exchange
if(!dlExchange.isBound(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue))
{
//actual routing key used does not matter due to use of fanout exchange,
//but we will make the key 'dlq' as it can be logged at creation.
dlExchange.addBinding(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue, null);
}
return dlExchangeName;
}
private static void validateDLNames(String name)
{
// check if DLQ name and DLQ exchange name do not exceed 255
String exchangeName = getDeadLetterExchangeName(name);
if (exchangeName.length() > MAX_LENGTH)
{
throw new IllegalArgumentException("DL exchange name '" + exchangeName
+ "' length exceeds limit of " + MAX_LENGTH + " characters for queue " + name);
}
String queueName = getDeadLetterQueueName(name);
if (queueName.length() > MAX_LENGTH)
{
throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of "
+ MAX_LENGTH + " characters for queue " + name);
}
}
private boolean shouldCreateDLQ(Map<String, Object> arguments)
{
boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
Queue.LIFETIME_POLICY,
arguments,
LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
//feature is not to be enabled for temporary queues or when explicitly disabled by argument
if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE))))
{
boolean dlqArgumentPresent = arguments != null
&& arguments.containsKey(CREATE_DLQ_ON_CREATION);
if (dlqArgumentPresent)
{
boolean dlqEnabled = true;
if (dlqArgumentPresent)
{
Object argument = arguments.get(CREATE_DLQ_ON_CREATION);
dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue())
|| (argument instanceof String && Boolean.parseBoolean(argument.toString()));
}
return dlqEnabled;
}
return isQueue_deadLetterQueueEnabled();
}
return false;
}
private static String getDeadLetterQueueName(String name)
{
return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
}
private static String getDeadLetterExchangeName(String name)
{
return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX);
}
@Override
public String getModelVersion()
{
return BrokerModel.MODEL_VERSION;
}
@Override
public DurableConfigurationStore getDurableConfigurationStore()
{
return _virtualHostNode.getConfigurationStore();
}
@Override
public void setTargetSize(final long targetSize)
{
_targetSize.set(targetSize);
allocateTargetSizeToQueues();
}
private void allocateTargetSizeToQueues()
{
long targetSize = _targetSize.get();
Collection<AMQQueue<?>> queues = getQueues();
long totalSize = calculateTotalEnqueuedSize(queues);
if(targetSize > 0l)
{
for (AMQQueue<?> q : queues)
{
long size = (long) ((((double) q.getPotentialMemoryFootprint() / (double) totalSize))
* (double) targetSize);
q.setTargetSize(size);
}
}
}
@Override
public long getTotalQueueDepthBytes()
{
return calculateTotalEnqueuedSize(getQueues());
}
private long calculateTotalEnqueuedSize(final Collection<AMQQueue<?>> queues)
{
long total = 0;
for(AMQQueue<?> queue : queues)
{
total += queue.getPotentialMemoryFootprint();
}
return total;
}
@Override
protected void onCreate()
{
super.onCreate();
ConfiguredObjectRecord record = asObjectRecord();
getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes()));
}
@StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE )
private void onActivate()
{
_houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount());
MessageStore messageStore = getMessageStore();
messageStore.openMessageStore(this);
if (!(_virtualHostNode.getConfigurationStore() instanceof MessageStoreProvider))
{
getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CREATED());
getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.STORE_LOCATION(messageStore.getStoreLocation()));
}
messageStore.upgradeStoreStructure();
if (isStoreEmpty())
{
createDefaultExchanges();
}
MessageStoreRecoverer messageStoreRecoverer;
if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
{
messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
}
else
{
messageStoreRecoverer = new SynchronousMessageStoreRecoverer();
}
messageStoreRecoverer.recover(this);
State finalState = State.ERRORED;
try
{
initialiseHouseKeeping(getHousekeepingCheckPeriod());
finalState = State.ACTIVE;
}
finally
{
setState(finalState);
reportIfError(getState());
}
}
@StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
private void onRestart()
{
resetStatistics();
final List<ConfiguredObjectRecord> records = new ArrayList<>();
// Transitioning to STOPPED will have closed all our children. Now we are transition
// back to ACTIVE, we need to recover and re-open them.
getDurableConfigurationStore().visitConfiguredObjectRecords(new ConfiguredObjectRecordHandler()
{
@Override
public void begin()
{
}
@Override
public boolean handle(final ConfiguredObjectRecord record)
{
records.add(record);
return true;
}
@Override
public void end()
{
}
});
new GenericRecoverer(this).recover(records);
Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
{
@Override
public Object run()
{
applyToChildren(new Action<ConfiguredObject<?>>()
{
@Override
public void performAction(final ConfiguredObject<?> object)
{
object.open();
}
});
return null;
}
});
onActivate();
}
private class StoreUpdatingChangeListener implements ConfigurationChangeListener
{
@Override
public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
{
if (object == AbstractVirtualHost.this && isDurable() && newState == State.DELETED)
{
getDurableConfigurationStore().remove(asObjectRecord());
object.removeChangeListener(this);
}
}
@Override
public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
{
}
@Override
public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
{
}
@Override
public void attributeSet(final ConfiguredObject<?> object,
final String attributeName,
final Object oldAttributeValue,
final Object newAttributeValue)
{
if (object == AbstractVirtualHost.this && isDurable() && getState() != State.DELETED && isAttributePersisted(attributeName)
&& !(attributeName.equals(VirtualHost.DESIRED_STATE) && newAttributeValue.equals(State.DELETED)))
{
getDurableConfigurationStore().update(false, asObjectRecord());
}
}
}
}