blob: 4cd276a0d0459b97fee29c1cbe4506a9f0d89935 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.virtualhost;
import java.security.AccessControlContext;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Content;
import org.apache.qpid.server.model.DerivedAttribute;
import org.apache.qpid.server.model.ManageableMessage;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.model.ManagedOperation;
import org.apache.qpid.server.model.ManagedStatistic;
import org.apache.qpid.server.model.Param;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.StatisticType;
import org.apache.qpid.server.model.StatisticUnit;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.auth.SocketConnectionMetaData;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.preferences.UserPreferencesCreator;
public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>> extends VirtualHost<X>,
EventListener,
StatisticsGatherer,
UserPreferencesCreator,
EventLoggerProvider,
CacheProvider
{
String HOUSEKEEPING_CHECK_PERIOD = "housekeepingCheckPeriod";
String STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = "storeTransactionIdleTimeoutClose";
String STORE_TRANSACTION_IDLE_TIMEOUT_WARN = "storeTransactionIdleTimeoutWarn";
String STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = "storeTransactionOpenTimeoutClose";
String STORE_TRANSACTION_OPEN_TIMEOUT_WARN = "storeTransactionOpenTimeoutWarn";
String HOUSE_KEEPING_THREAD_COUNT = "houseKeepingThreadCount";
String ENABLED_CONNECTION_VALIDATORS = "enabledConnectionValidators";
String DISABLED_CONNECTION_VALIDATORS = "disabledConnectionValidators";
String NUMBER_OF_SELECTORS = "numberOfSelectors";
String CONNECTION_THREAD_POOL_SIZE = "connectionThreadPoolSize";
String GLOBAL_ADDRESS_DOMAINS = "globalAddressDomains";
String NODE_AUTO_CREATION_POLICIES = "nodeAutoCreationPolicies";
String STATISTICS_REPORTING_PERIOD = "statisticsReportingPeriod";
@ManagedContextDefault( name = "virtualhost.housekeepingCheckPeriod")
long DEFAULT_HOUSEKEEPING_CHECK_PERIOD = 30000L;
String FLOW_TO_DISK_CHECK_PERIOD = "virtualhost.flowToDiskCheckPeriod";
@ManagedContextDefault(name = FLOW_TO_DISK_CHECK_PERIOD)
long DEFAULT_FLOW_TO_DISK_CHECK_PERIOD = 30000L;
String CONNECTION_THREAD_POOL_KEEP_ALIVE_TIMEOUT = "connectionThreadPoolKeepAliveTimeout";
@SuppressWarnings("unused")
@ManagedContextDefault(name = QueueManagingVirtualHost.CONNECTION_THREAD_POOL_KEEP_ALIVE_TIMEOUT)
long DEFAULT_CONNECTION_THREAD_POOL_KEEP_ALIVE_TIMEOUT = 60; // Minutes
@ManagedContextDefault( name = "virtualhost.storeTransactionIdleTimeoutClose")
public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = 0l;
@ManagedContextDefault( name = "virtualhost.housekeepingThreadCount")
int DEFAULT_HOUSEKEEPING_THREAD_COUNT = 4;
String VIRTUALHOST_STATISTICS_REPORING_PERIOD = "virtualhost.statisticsReportingPeriod";
@ManagedContextDefault(name = VIRTUALHOST_STATISTICS_REPORING_PERIOD)
int DEFAULT_STATISTICS_REPORTING_PERIOD = 0;
String DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH = "qpid.jms.discardGlobalSharedSubscriptionLinksOnDetach";
@SuppressWarnings("unused")
@ManagedContextDefault(name = DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH,
description = "If true AMQP 1.0 links of global shared subscriptions are discarded when the"
+ " link detaches. This is to avoid leaking links with the Qpid JMS client.")
boolean DEFAULT_DISCARD_GLOBAL_SHARED_SUBSCRIPTION_LINKS_ON_DETACH = true;
@ManagedAttribute( defaultValue = "${" + VIRTUALHOST_STATISTICS_REPORING_PERIOD + "}", description = "Period (in seconds) of the statistic report.")
int getStatisticsReportingPeriod();
@ManagedAttribute( defaultValue = "${virtualhost.storeTransactionIdleTimeoutClose}",
description = "The maximum length of time, in milliseconds, that an open store transaction may "
+ "remain idle. If a transaction exceeds this threshold, the resource that "
+ "created the transaction will be closed automatically.")
long getStoreTransactionIdleTimeoutClose();
@ManagedContextDefault( name = "virtualhost.storeTransactionIdleTimeoutWarn")
public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN = 180000l;
@ManagedAttribute( defaultValue = "${virtualhost.storeTransactionIdleTimeoutWarn}",
description = "The maximum length of time, in milliseconds, that an open store transaction may "
+ "remain idle. If a transaction exceeds this threshold, warnings will be "
+ "written to the logs.")
long getStoreTransactionIdleTimeoutWarn();
@ManagedContextDefault( name = "virtualhost.storeTransactionOpenTimeoutClose")
public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = 0l;
@ManagedAttribute( defaultValue = "${virtualhost.storeTransactionOpenTimeoutClose}",
description = "The maximum length of time, in milliseconds, that a store transaction may "
+ "remain open. If a transaction exceeds this threshold, the resource that "
+ "created the transaction will be closed automatically.")
long getStoreTransactionOpenTimeoutClose();
@ManagedContextDefault( name = "virtualhost.storeTransactionOpenTimeoutWarn")
public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_WARN = 300000l;
@ManagedAttribute( defaultValue = "${virtualhost.storeTransactionOpenTimeoutWarn}",
description = "The maximum length of time, in milliseconds, that a store transaction may "
+ "remain open. If a transaction exceeds this threshold, warnings will be "
+ "written to the logs.")
long getStoreTransactionOpenTimeoutWarn();
@ManagedAttribute( defaultValue = "${virtualhost.housekeepingCheckPeriod}")
long getHousekeepingCheckPeriod();
@DerivedAttribute( description = "Time (in milliseconds) between checks whether existing messages "
+ "need to be flowed to disk in order to free memory." )
long getFlowToDiskCheckPeriod();
@DerivedAttribute( description = "Indicates whether global shared durable subscriptions are disabled")
boolean isDiscardGlobalSharedSubscriptionLinksOnDetach();
String VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE = "virtualhost.connectionThreadPool.size";
@SuppressWarnings("unused")
@ManagedContextDefault( name = VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE)
long DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE = Math.max(Runtime.getRuntime().availableProcessors() * 2, 64);
@ManagedAttribute( defaultValue = "${" + QueueManagingVirtualHost.VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE + "}")
int getConnectionThreadPoolSize();
String VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS = "virtualhost.connectionThreadPool.numberOfSelectors";
@SuppressWarnings("unused")
@ManagedContextDefault( name = VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS)
long DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS = Math.max(DEFAULT_VIRTUALHOST_CONNECTION_THREAD_POOL_SIZE/8, 1);
String NAMED_CACHE_MAXIMUM_SIZE = "virtualhost.namedCache.maximumSize";
@SuppressWarnings("unused")
@ManagedContextDefault(name = NAMED_CACHE_MAXIMUM_SIZE, description = "Maximum number of entries within the named cached")
int DEFAULT_NAMED_CACHE_SIZE = 100;
String NAMED_CACHE_MAXIMUM_SIZE_FORMAT = "virtualhost.namedCache.%s.maximumSize";
String NAMED_CACHE_EXPIRATION = "virtualhost.namedCache.expiration";
@SuppressWarnings("unused")
@ManagedContextDefault(name = NAMED_CACHE_EXPIRATION, description = "Expiration time (in millis) applied to cached values within the named cache")
long DEFAULT_NAMED_CACHE_EXPIRATION = 300 * 1000;
String NAMED_CACHE_EXPIRATION_FORMAT = "virtualhost.namedCache.%s.expiration";
@ManagedAttribute( defaultValue = "${" + QueueManagingVirtualHost.VIRTUALHOST_CONNECTION_THREAD_POOL_NUMBER_OF_SELECTORS + "}")
int getNumberOfSelectors();
@ManagedAttribute( defaultValue = "${virtualhost.housekeepingThreadCount}")
int getHousekeepingThreadCount();
@ManagedAttribute( defaultValue = "[]",
description = "a list of policies used for auto-creating nodes (such as Queues or Exchanges) when an "
+ "address is published to or subscribed from and no node matching the address currently "
+ "exists. Each policy describes a pattern to match against the address, the circumstances "
+ "when auto-creation should occur (on publish, on consume, or both), the type of node to be "
+ "created, and the properties of the node.")
List<NodeAutoCreationPolicy> getNodeAutoCreationPolicies();
@ManagedContextDefault( name = "virtualhost.enabledConnectionValidators")
String DEFAULT_ENABLED_VALIDATORS = "[]";
@ManagedAttribute( defaultValue = "${virtualhost.enabledConnectionValidators}")
List<String> getEnabledConnectionValidators();
@ManagedContextDefault( name = "virtualhost.disabledConnectionValidators")
String DEFAULT_DISABLED_VALIDATORS = "[]";
@ManagedAttribute( defaultValue = "${virtualhost.disabledConnectionValidators}")
List<String> getDisabledConnectionValidators();
@Override
@ManagedAttribute( defaultValue = "[]")
List<String> getGlobalAddressDomains();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Queues",
description = "Current number of queues on this virtualhost.", metricName = "queues_total")
long getQueueCount();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Exchanges",
description = "Current number of exchanges on this virtualhost.", metricName = "exchanges_total")
long getExchangeCount();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Connections",
description = "Current number of messaging connections made to this virtualhost.", metricName = "connections_total")
long getConnectionCount();
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.COUNT, label = "Total Connections",
description = "Total number of messaging connections made to this virtualhost since broker startup", metricName = "aggregate_connection_count")
long getTotalConnectionCount();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Inbound",
description = "Total size of all messages received by this virtualhost.", metricName = "inbound_bytes_count")
long getBytesIn();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Outbound",
description = "Total size of all messages delivered by this virtualhost.", metricName = "outbound_bytes_count")
long getBytesOut();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Inbound",
description = "Total number of messages received by this virtualhost.", metricName = "inbound_messages_count")
long getMessagesIn();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES, label = "Outbound",
description = "Total number of messages delivered by this virtualhost.", metricName = "outbound_messages_count")
long getMessagesOut();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES,
label = "Transacted Inbound",
description = "Total number of messages delivered by this virtualhost within a transaction.", metricName = "inbound_transacted_messages_count")
long getTransactedMessagesIn();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.MESSAGES,
label = "Transacted Outbound",
description = "Total number of messages received by this virtualhost within a transaction.", metricName = "outbound_transacted_messages_count")
long getTransactedMessagesOut();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Queue Depth",
description = "Current size of all messages enqueued by this virtualhost.", metricName = "queue_depth_bytes_total")
long getTotalDepthOfQueuesBytes();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Queue Depth",
description = "Current number of messages enqueued by this virtualhost.", metricName = "queue_depth_messages_total")
long getTotalDepthOfQueuesMessages();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "In-Memory Message Bytes",
description="Current size of all messages cached in-memory.", metricName = "in_memory_message_size_bytes_total")
long getInMemoryMessageSize();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = StatisticUnit.BYTES, label = "Evacuated Message Bytes",
description = "Total Number of Bytes Evacuated from Memory Due to Flow to Disk.")
long getBytesEvacuatedFromMemory();
@SuppressWarnings("unused")
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME,
units = StatisticUnit.BYTES,
label = "Maximum recorded size of inbound messages",
description = "Maximum size of message published into the Virtual Host since start-up.", metricName = "inbound_message_size_high_water_mark")
long getInboundMessageSizeHighWatermark();
@Override
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
Collection<? extends Connection<?>> getConnections();
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
Connection<?> getConnection(@Param(name="name", mandatory = true) String name);
@ManagedOperation(secure = true,
description = "Publishes a message to a specified address. "
+ "Returns the number of queues onto which it has been placed, "
+ " or zero, if the address routes to no queues.",
changesConfiguredObjectState = false)
int publishMessage(@Param(name = "message", mandatory = true)ManageableMessage message);
@ManagedOperation(nonModifying = true,
description = "Extract configuration",
paramRequiringSecure = "includeSecureAttributes",
changesConfiguredObjectState = false)
Map<String,Object> extractConfig(@Param(name="includeSecureAttributes",
description = "include attributes that may contain passwords or other "
+ "confidential information",
defaultValue = "false") boolean includeSecureAttributes);
@ManagedOperation(nonModifying = true,
description = "Extract message store content",
secure = true,
changesConfiguredObjectState = false)
Content exportMessageStore();
@ManagedOperation(description = "Import message store content",
secure = true,
changesConfiguredObjectState = false)
void importMessageStore(@Param(name="source", description = "Extract file", mandatory = true)String source);
@ManagedOperation(nonModifying = true,
description = "Returns metadata concerning the current connection",
changesConfiguredObjectState = false,
skipAclCheck = true)
SocketConnectionMetaData getConnectionMetaData();
@SuppressWarnings("unused")
@ManagedOperation(nonModifying = true, description = "Dumps link registry", changesConfiguredObjectState = false)
Object dumpLinkRegistry();
@SuppressWarnings("unused")
@ManagedOperation(description = "Removes links with the given name and containerId pattern from the link registry.", changesConfiguredObjectState = false)
void purgeLinkRegistry(@Param(name = "containerIdPattern", description = "Regular Expression to match the remote container id.", defaultValue = ".*") String containerIdPattern,
@Param(name = "role", description = "whether to remove only sending links (\"SENDER\"), receiving links (\"RECEIVER\") or both (\"BOTH\")", validValues = {"SENDER", "RECEIVER", "BOTH"}, defaultValue = "BOTH") String role,
@Param(name = "linkNamePattern", description = "Regular Expression to match the link names to be removed.", defaultValue = ".*") String linkNamePattern);
Queue<?> getSubscriptionQueue(final String exchangeName,
final Map<String, Object> attributes,
final Map<String, Map<String, Object>> bindings);
void removeSubscriptionQueue(final String queueName);
Broker<?> getBroker();
DurableConfigurationStore getDurableConfigurationStore();
void executeTransaction(TransactionalOperation op);
void executeTask(String name, Runnable task, AccessControlContext context);
void scheduleHouseKeepingTask(long period, HouseKeepingTask task);
ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
Queue<?> getAttainedQueue(UUID id);
Queue<?> getAttainedQueue(String name);
<T extends ConfiguredObject<?>> T getAttainedChildFromAddress(Class<T> childClass,
String address);
void setFirstOpening(boolean firstOpening);
long getTargetSize();
void setTargetSize(long targetSize);
MessageDestination getSystemDestination(String name);
ListenableFuture<Void> reallocateMessages();
boolean isOverTargetSize();
interface Transaction
{
void dequeue(QueueEntry entry);
void copy(QueueEntry entry, Queue<?> queue);
void move(QueueEntry entry, Queue<?> queue);
}
interface TransactionalOperation
{
void withinTransaction(Transaction txn);
List<Long> getModifiedMessageIds();
}
}