blob: 5109bd2bf8ecce024643c13bb6b1089e7d529d11 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.replay.ReplayManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.utils.JsonLoader;
public class AddressControlImpl extends AbstractControl implements AddressControl {
private AddressInfo addressInfo;
private final ActiveMQServer server;
private final PagingManager pagingManager;
private final HierarchicalRepository<Set<Role>> securityRepository;
private final SecurityStore securityStore;
private final ManagementService managementService;
public AddressControlImpl(AddressInfo addressInfo,
final ActiveMQServer server,
final PagingManager pagingManager,
final StorageManager storageManager,
final HierarchicalRepository<Set<Role>> securityRepository,
final SecurityStore securityStore,
final ManagementService managementService)throws Exception {
super(AddressControl.class, storageManager);
this.server = server;
this.addressInfo = addressInfo;
this.pagingManager = pagingManager;
this.securityRepository = securityRepository;
this.securityStore = securityStore;
this.managementService = managementService;
}
// AddressControlMBean implementation ----------------------------
@Override
public String getAddress() {
return addressInfo.getName().toString();
}
@Override
public String[] getRoutingTypes() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getRoutingTypes(this.addressInfo);
}
EnumSet<RoutingType> routingTypes = addressInfo.getRoutingTypes();
String[] result = new String[routingTypes.size()];
int i = 0;
for (RoutingType routingType : routingTypes) {
result[i++] = routingType.toString();
}
return result;
}
@Override
public String getRoutingTypesAsJSON() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getRoutingTypesAsJSON(this.addressInfo);
}
clearIO();
try {
JsonArrayBuilder json = JsonLoader.createArrayBuilder();
String[] routingTypes = getRoutingTypes();
for (String routingType : routingTypes) {
json.add(routingType);
}
return json.build().toString();
} finally {
blockOnIO();
}
}
@Override
public String[] getRemoteQueueNames() {
return getQueueNames(SearchType.REMOTE);
}
@Override
public String[] getQueueNames() {
return getQueueNames(SearchType.LOCAL);
}
@Override
public String[] getAllQueueNames() {
return getQueueNames(SearchType.ALL);
}
enum SearchType {
LOCAL, REMOTE, ALL
}
private String[] getQueueNames(SearchType searchType) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getQueueNames(this.addressInfo, searchType);
}
clearIO();
try {
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
if (bindings != null) {
List<String> queueNames = new ArrayList<>();
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding && ((searchType == SearchType.ALL) || (searchType == SearchType.LOCAL && binding.isLocal()) || (searchType == SearchType.REMOTE && binding instanceof RemoteQueueBinding))) {
queueNames.add(binding.getUniqueName().toString());
}
}
return queueNames.toArray(new String[queueNames.size()]);
} else {
return new String[0];
}
} catch (Throwable t) {
throw new IllegalStateException(t.getMessage());
} finally {
blockOnIO();
}
}
@Override
public String[] getBindingNames() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getBindingNames(this.addressInfo);
}
try {
clearIO();
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
if (bindings != null) {
String[] bindingNames = new String[bindings.getBindings().size()];
int i = 0;
for (Binding binding : bindings.getBindings()) {
bindingNames[i++] = binding.getUniqueName().toString();
}
return bindingNames;
} else {
return new String[0];
}
} catch (Throwable t) {
throw new IllegalStateException(t.getMessage());
} finally {
blockOnIO();
}
}
@Override
public Object[] getRoles() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getRoles(this.addressInfo);
}
clearIO();
try {
Set<Role> roles = securityRepository.getMatch(addressInfo.getName().toString());
Object[] objRoles = new Object[roles.size()];
int i = 0;
for (Role role : roles) {
objRoles[i++] = new Object[]{
role.getName(),
CheckType.SEND.hasRole(role),
CheckType.CONSUME.hasRole(role),
CheckType.CREATE_DURABLE_QUEUE.hasRole(role),
CheckType.DELETE_DURABLE_QUEUE.hasRole(role),
CheckType.CREATE_NON_DURABLE_QUEUE.hasRole(role),
CheckType.DELETE_NON_DURABLE_QUEUE.hasRole(role),
CheckType.MANAGE.hasRole(role),
CheckType.BROWSE.hasRole(role),
CheckType.CREATE_ADDRESS.hasRole(role),
CheckType.DELETE_ADDRESS.hasRole(role)
};
}
return objRoles;
} finally {
blockOnIO();
}
}
@Override
public String getRolesAsJSON() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getRolesAsJSON(this.addressInfo);
}
clearIO();
try {
JsonArrayBuilder json = JsonLoader.createArrayBuilder();
Set<Role> roles = securityRepository.getMatch(addressInfo.getName().toString());
for (Role role : roles) {
json.add(role.toJson());
}
return json.build().toString();
} finally {
blockOnIO();
}
}
@Override
public long getNumberOfBytesPerPage() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getNumberOfBytesPerPage(this.addressInfo);
}
clearIO();
try {
final PagingStore pagingStore = getPagingStore();
if (pagingStore == null) {
return 0;
}
return pagingStore.getPageSizeBytes();
} finally {
blockOnIO();
}
}
private PagingStore getPagingStore() throws Exception {
return pagingManager.getPageStore(addressInfo.getName());
}
@Override
public long getAddressSize() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getAddressSize(this.addressInfo);
}
clearIO();
try {
final PagingStore pagingStore = getPagingStore();
if (pagingStore == null) {
return 0;
}
return pagingStore.getAddressSize();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug("Failed to get address size", e);
return -1;
} finally {
blockOnIO();
}
}
@Override
public void schedulePageCleanup() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.schedulePageCleanup(this.addressInfo);
}
clearIO();
try {
final PagingStore pagingStore = getPagingStore();
if (pagingStore == null) {
return;
}
pagingStore.getCursorProvider().scheduleCleanup();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug("Failed to schedule pageCleanup", e);
throw e;
} finally {
blockOnIO();
}
}
@Override
@Deprecated
public long getNumberOfMessages() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getNumberOfMessages(this.addressInfo);
}
clearIO();
try {
return getMessageCount();
} catch (Throwable t) {
throw new IllegalStateException(t.getMessage());
} finally {
blockOnIO();
}
}
@Override
public boolean isPaging() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.isPaging(this.addressInfo);
}
clearIO();
try {
final PagingStore pagingStore = getPagingStore();
if (pagingStore == null) {
return false;
}
return pagingStore.isPaging();
} finally {
blockOnIO();
}
}
@Override
public int getAddressLimitPercent() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getAddressLimitPercent(this.addressInfo);
}
clearIO();
try {
final PagingStore pagingStore = getPagingStore();
if (pagingStore == null) {
return 0;
}
return pagingStore.getAddressLimitPercent();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug("Failed to get address limit %", e);
return -1;
} finally {
blockOnIO();
}
}
@Override
public boolean block() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.block(this.addressInfo);
}
clearIO();
boolean result = false;
try {
final PagingStore pagingStore = getPagingStore();
if (pagingStore != null) {
pagingStore.block();
result = true;
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug("Failed to block", e);
} finally {
blockOnIO();
}
return result;
}
@Override
public void unblock() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.unBlock(this.addressInfo);
}
clearIO();
try {
final PagingStore pagingStore = getPagingStore();
if (pagingStore != null) {
pagingStore.unblock();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug("Failed to unblock", e);
} finally {
blockOnIO();
}
}
@Override
public long getNumberOfPages() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getNumberOfPages(this.addressInfo);
}
clearIO();
try {
final PagingStore pageStore = getPagingStore();
if (pageStore == null || !pageStore.isPaging()) {
return 0;
} else {
return pageStore.getNumberOfPages();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug("Failed to get number of pages", e);
return -1;
} finally {
blockOnIO();
}
}
@Override
public long getMessageCount() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getMessageCount(this.addressInfo);
}
return getMessageCount(DurabilityType.ALL);
}
@Override
public long getRoutedMessageCount() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getRoutedMessageCount(this.addressInfo);
}
return addressInfo.getRoutedMessageCount();
}
@Override
public long getUnRoutedMessageCount() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getUnRoutedMessageCount(this.addressInfo);
}
return addressInfo.getUnRoutedMessageCount();
}
@Override
public String sendMessage(final Map<String, String> headers,
final int type,
final String body,
boolean durable,
final String user,
final String password) throws Exception {
return sendMessage(headers, type, body, durable, user, password, false);
}
@Override
public String sendMessage(final Map<String, String> headers,
final int type,
final String body,
boolean durable,
final String user,
final String password,
boolean createMessageId) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.sendMessageThroughManagement(this, headers, type, body, durable, user, "****");
}
try {
return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password, createMessageId);
} catch (Exception e) {
e.printStackTrace();
throw new IllegalStateException(e.getMessage());
}
}
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo() {
return MBeanInfoHelper.getMBeanOperationsInfo(AddressControl.class);
}
@Override
protected MBeanAttributeInfo[] fillMBeanAttributeInfo() {
return MBeanInfoHelper.getMBeanAttributesInfo(AddressControl.class);
}
@Override
public void pause() {
pause(false);
}
@Override
public void pause(boolean persist) {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.pause(addressInfo);
}
checkStarted();
clearIO();
try {
addressInfo.setPostOffice(server.getPostOffice());
addressInfo.setStorageManager(server.getStorageManager());
addressInfo.pause(persist);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.pauseAddressSuccess(addressInfo.getName().toString());
}
} catch (Exception e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.pauseAddressFailure(addressInfo.getName().toString());
}
throw e;
} finally {
blockOnIO();
}
}
@Override
public void resume() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.resume(addressInfo);
}
checkStarted();
clearIO();
try {
addressInfo.setPostOffice(server.getPostOffice());
addressInfo.setStorageManager(server.getStorageManager());
addressInfo.resume();
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.resumeAddressSuccess(addressInfo.getName().toString());
}
} catch (Exception e) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.resumeAddressFailure(addressInfo.getName().toString());
}
throw e;
} finally {
blockOnIO();
}
}
@Override
public boolean isPaused() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.isPaused(this.addressInfo);
}
return addressInfo.isPaused();
}
@Override
public boolean isRetroactiveResource() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.isRetroactiveResource(this.addressInfo);
}
return ResourceNames.isRetroactiveResource(server.getInternalNamingPrefix(), addressInfo.getName());
}
@Override
public long getCurrentDuplicateIdCacheSize() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getCurrentDuplicateIdCacheSize(this.addressInfo);
}
DuplicateIDCache cache = ((PostOfficeImpl)server.getPostOffice()).getDuplicateIDCaches().get(addressInfo.getName());
try {
if (cache != null) {
return cache.getMap().size();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug("Failed to get duplicate ID cache size", e);
}
return 0;
}
@Override
public boolean clearDuplicateIdCache() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.clearDuplicateIdCache(this.addressInfo);
}
DuplicateIDCache cache = ((PostOfficeImpl)server.getPostOffice()).getDuplicateIDCaches().get(addressInfo.getName());
try {
if (cache != null) {
cache.clear();
return true;
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug("Failed to clear duplicate ID cache", e);
}
return false;
}
@Override
public boolean isAutoCreated() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.isAutoCreated(this.addressInfo);
}
return addressInfo.isAutoCreated();
}
@Override
public boolean isInternal() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.isInternal(this.addressInfo);
}
return addressInfo.isInternal();
}
@Override
public boolean isTemporary() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.isTemporary(this.addressInfo);
}
return addressInfo.isTemporary();
}
@Override
public long purge() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.purge(this.addressInfo);
}
clearIO();
long totalMsgs = 0;
try {
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
if (bindings != null) {
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
totalMsgs += ((QueueBinding) binding).getQueue().deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
}
}
}
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.purgeAddressSuccess(addressInfo.getName().toString());
}
} catch (Throwable t) {
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.purgeAddressFailure(addressInfo.getName().toString());
}
throw new IllegalStateException(t.getMessage());
} finally {
blockOnIO();
}
return totalMsgs;
}
@Override
public void replay(String target, String filter) throws Exception {
server.replay(null, null, this.getAddress(), target, filter);
}
@Override
public void replay(String startScan, String endScan, String target, String filter) throws Exception {
SimpleDateFormat format = ReplayManager.newRetentionSimpleDateFormat();
Date startScanDate = format.parse(startScan);
Date endScanDate = format.parse(endScan);
server.replay(startScanDate, endScanDate, this.getAddress(), target, filter);
}
private long getMessageCount(final DurabilityType durability) {
long count = 0;
for (String queueName : getQueueNames()) {
Queue queue = server.locateQueue(queueName);
if (queue != null &&
(durability == DurabilityType.ALL ||
(durability == DurabilityType.DURABLE && queue.isDurable()) ||
(durability == DurabilityType.NON_DURABLE && !queue.isDurable()))) {
count += queue.getMessageCount();
}
}
return count;
}
private void checkStarted() {
if (!server.getPostOffice().isStarted()) {
throw new IllegalStateException("Broker is not started. Queues can not be managed yet");
}
}
private enum DurabilityType {
ALL, DURABLE, NON_DURABLE
}
}