| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.management.amqp; |
| |
| import java.io.Serializable; |
| import java.lang.reflect.ParameterizedType; |
| import java.lang.reflect.Type; |
| import java.lang.reflect.TypeVariable; |
| import java.lang.reflect.WildcardType; |
| import java.nio.charset.Charset; |
| import java.security.AccessControlException; |
| import java.security.AccessController; |
| import java.text.MessageFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.regex.Pattern; |
| |
| import javax.security.auth.Subject; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.server.configuration.IllegalConfigurationException; |
| import org.apache.qpid.server.connection.SessionPrincipal; |
| import org.apache.qpid.server.consumer.ConsumerOption; |
| import org.apache.qpid.server.consumer.ConsumerTarget; |
| import org.apache.qpid.server.filter.FilterManager; |
| import org.apache.qpid.server.filter.Filterable; |
| import org.apache.qpid.server.message.AMQMessageHeader; |
| import org.apache.qpid.server.message.InstanceProperties; |
| import org.apache.qpid.server.message.MessageDestination; |
| import org.apache.qpid.server.message.MessageInstance; |
| import org.apache.qpid.server.message.MessageInstanceConsumer; |
| import org.apache.qpid.server.message.MessageSender; |
| import org.apache.qpid.server.message.MessageSource; |
| import org.apache.qpid.server.message.ServerMessage; |
| import org.apache.qpid.server.message.internal.InternalMessage; |
| import org.apache.qpid.server.message.internal.InternalMessageHeader; |
| import org.apache.qpid.server.model.AbstractConfiguredObject; |
| import org.apache.qpid.server.model.ConfiguredObject; |
| import org.apache.qpid.server.model.ConfiguredObjectFinder; |
| import org.apache.qpid.server.model.ConfiguredObjectOperation; |
| import org.apache.qpid.server.model.ConfiguredObjectTypeRegistry; |
| import org.apache.qpid.server.model.IntegrityViolationException; |
| import org.apache.qpid.server.model.ManagedObject; |
| import org.apache.qpid.server.model.Model; |
| import org.apache.qpid.server.model.NamedAddressSpace; |
| import org.apache.qpid.server.model.OperationParameter; |
| import org.apache.qpid.server.model.PublishingLink; |
| import org.apache.qpid.server.plugin.MessageConverter; |
| import org.apache.qpid.server.protocol.AMQSessionModel; |
| import org.apache.qpid.server.protocol.MessageConverterRegistry; |
| import org.apache.qpid.server.security.SecurityToken; |
| import org.apache.qpid.server.store.MessageDurability; |
| import org.apache.qpid.server.store.MessageEnqueueRecord; |
| import org.apache.qpid.server.store.StorableMessageMetaData; |
| import org.apache.qpid.server.store.TransactionLogResource; |
| import org.apache.qpid.server.txn.AutoCommitTransaction; |
| import org.apache.qpid.server.txn.ServerTransaction; |
| import org.apache.qpid.server.util.Action; |
| import org.apache.qpid.server.util.ServerScopedRuntimeException; |
| import org.apache.qpid.server.util.StateChangeListener; |
| |
| class ManagementNode implements MessageSource, MessageDestination |
| { |
| private static final Logger LOGGER = LoggerFactory.getLogger(ManagementNode.class); |
| |
| public static final String IDENTITY_ATTRIBUTE = "identity"; |
| public static final String INDEX_ATTRIBUTE = "index"; |
| public static final String KEY_ATTRIBUTE = "key"; |
| public static final String ACTUALS_ATTRIBUTE = "actuals"; |
| |
| public static final String TYPE_ATTRIBUTE = "type"; |
| public static final String OPERATION_HEADER = "operation"; |
| public static final String SELF_NODE_NAME = "self"; |
| public static final String MANAGEMENT_TYPE = "org.amqp.management"; |
| public static final String GET_TYPES = "GET-TYPES"; |
| public static final String GET_ATTRIBUTES = "GET-ATTRIBUTES"; |
| public static final String GET_OPERATIONS = "GET-OPERATIONS"; |
| public static final String QUERY = "QUERY"; |
| public static final String ENTITY_TYPE_HEADER = "entityType"; |
| public static final String STATUS_CODE_HEADER = "statusCode"; |
| public static final String OFFSET_HEADER = "offset"; |
| public static final String COUNT_HEADER = "count"; |
| public static final String MANAGEMENT_NODE_NAME = "$management"; |
| public static final String STATUS_DESCRIPTION_HEADER = "statusDescription"; |
| public static final String ATTRIBUTES_HEADER = "attributes"; |
| public static final String ATTRIBUTE_NAMES = "attributeNames"; |
| public static final String RESULTS = "results"; |
| static final String OBJECT_PATH = "object-path"; |
| static final String QPID_TYPE = "qpid-type"; |
| |
| |
| public static final int STATUS_CODE_OK = 200; |
| private static final int STATUS_CODE_CREATED = 201; |
| public static final int STATUS_CODE_NO_CONTENT = 204; |
| public static final int STATUS_CODE_BAD_REQUEST = 400; |
| public static final int STATUS_CODE_FORBIDDEN = 403; |
| public static final int STATUS_CODE_NOT_FOUND = 404; |
| public static final int STATUS_CODE_CONFLICT = 409; |
| public static final int STATUS_CODE_INTERNAL_ERROR = 500; |
| public static final int STATUS_CODE_NOT_IMPLEMENTED= 501; |
| private static final Comparator<? super ConfiguredObject<?>> OBJECT_COMPARATOR = |
| new Comparator<ConfiguredObject<?>>() |
| { |
| @Override |
| public int compare(final ConfiguredObject<?> o1, final ConfiguredObject<?> o2) |
| { |
| if(o1 == o2) |
| { |
| return 0; |
| } |
| int result = o1.getCategoryClass().getSimpleName().compareTo(o2.getCategoryClass().getSimpleName()); |
| if(result == 0) |
| { |
| result = o1.getName().compareTo(o2.getName()); |
| } |
| if(result == 0) |
| { |
| result = o1.getId().compareTo(o2.getId()); |
| } |
| return result; |
| } |
| }; |
| |
| |
| private final NamedAddressSpace _addressSpace; |
| |
| private final UUID _id; |
| |
| private final ConfiguredObject<?> _managedObject; |
| private final Model _model; |
| private final Map<Class<? extends ConfiguredObject>, ConfiguredObjectOperation<?>> _associatedChildrenOperations = new HashMap<>(); |
| private final ConfiguredObjectFinder _configuredObjectFinder; |
| private List<ManagementNodeConsumer> _consumers = new CopyOnWriteArrayList<>(); |
| |
| private final Set<Class<? extends ConfiguredObject>> _managedCategories = new HashSet<>(); |
| private final Map<String, Class<? extends ConfiguredObject>> _managedTypes = new HashMap<>(); |
| private final Map<Class<? extends ConfiguredObject>, Map<String, StandardOperation>> _standardOperations = new HashMap<>(); |
| |
| private final ManagementOutputConverter _managementOutputConverter; |
| |
| private final ManagementInputConverter _managementInputConverter; |
| |
| ManagementNode(final NamedAddressSpace addressSpace, |
| final ConfiguredObject<?> configuredObject) |
| { |
| _addressSpace = addressSpace; |
| final String name = configuredObject.getId() + MANAGEMENT_NODE_NAME; |
| _id = UUID.nameUUIDFromBytes(name.getBytes(Charset.defaultCharset())); |
| |
| _model = configuredObject.getModel(); |
| _managedObject = configuredObject; |
| |
| populateMetaData(); |
| _managementOutputConverter = new ManagementOutputConverter(this); |
| _managementInputConverter = new ManagementInputConverter(this); |
| |
| _configuredObjectFinder = new ConfiguredObjectFinder(configuredObject); |
| } |
| |
| ConfiguredObject<?> getManagedObject() |
| { |
| return _managedObject; |
| } |
| |
| boolean isSyntheticChildClass(Class<? extends ConfiguredObject> clazz) |
| { |
| return _associatedChildrenOperations.containsKey(clazz); |
| } |
| |
| private void populateMetaData() |
| { |
| populateManagedCategories(); |
| |
| populateManagedTypes(); |
| |
| populateStandardOperations(); |
| } |
| |
| private void populateStandardOperations() |
| { |
| for(Class<? extends ConfiguredObject> type : _managedTypes.values()) |
| { |
| HashMap<String, StandardOperation> operationsMap = new HashMap<>(); |
| _standardOperations.put(type, operationsMap); |
| operationsMap.put(READ_OPERATION.getName(), READ_OPERATION); |
| operationsMap.put(UPDATE_OPERATION.getName(), UPDATE_OPERATION); |
| if(ConfiguredObjectTypeRegistry.getCategory(type) != _managedObject.getCategoryClass()) |
| { |
| operationsMap.put(DELETE_OPERATION.getName(), DELETE_OPERATION); |
| if(type.getAnnotation(ManagedObject.class).creatable()) |
| { |
| operationsMap.put(CREATE_OPERATION.getName(), CREATE_OPERATION); |
| } |
| } |
| } |
| } |
| |
| private void populateManagedTypes() |
| { |
| for(Class<? extends ConfiguredObject> category : _managedCategories) |
| { |
| _managedTypes.put(getAmqpName(category), category); |
| if(category != _managedObject.getCategoryClass()) |
| { |
| for (Class<? extends ConfiguredObject> type : _model.getTypeRegistry().getTypeSpecialisations(category)) |
| { |
| if (type.getAnnotation(ManagedObject.class) != null) |
| { |
| _managedTypes.put(getAmqpName(type), type); |
| } |
| } |
| } |
| else if(_managedObject.getTypeClass() != _managedObject.getCategoryClass()) |
| { |
| _managedTypes.put(getAmqpName(_managedObject.getTypeClass()), _managedObject.getTypeClass()); |
| } |
| } |
| } |
| |
| private void populateManagedCategories() |
| { |
| Class<? extends ConfiguredObject> managedCategory = _managedObject.getCategoryClass(); |
| addManagedCategories(managedCategory); |
| |
| |
| for(ConfiguredObjectOperation<?> operation : _model.getTypeRegistry().getOperations(managedCategory).values()) |
| { |
| if(operation.isAssociateAsIfChildren() && returnsCollectionOfConfiguredObjects(operation)) |
| { |
| @SuppressWarnings("unchecked") |
| Class<? extends ConfiguredObject> associatedChildCategory = |
| (getCollectionMemberType((ParameterizedType) operation.getGenericReturnType())); |
| _associatedChildrenOperations.put(associatedChildCategory, operation); |
| addManagedCategories(associatedChildCategory); |
| } |
| } |
| } |
| |
| |
| private boolean returnsCollectionOfConfiguredObjects(ConfiguredObjectOperation operation) |
| { |
| return Collection.class.isAssignableFrom(operation.getReturnType()) |
| && operation.getGenericReturnType() instanceof ParameterizedType |
| && ConfiguredObject.class.isAssignableFrom(getCollectionMemberType((ParameterizedType) operation.getGenericReturnType())); |
| } |
| |
| private Class getCollectionMemberType(ParameterizedType collectionType) |
| { |
| return getRawType((collectionType).getActualTypeArguments()[0]); |
| } |
| |
| private static Class getRawType(Type t) |
| { |
| if(t instanceof Class) |
| { |
| return (Class)t; |
| } |
| else if(t instanceof ParameterizedType) |
| { |
| return (Class)((ParameterizedType)t).getRawType(); |
| } |
| else if(t instanceof TypeVariable) |
| { |
| Type[] bounds = ((TypeVariable)t).getBounds(); |
| if(bounds.length == 1) |
| { |
| return getRawType(bounds[0]); |
| } |
| } |
| else if(t instanceof WildcardType) |
| { |
| Type[] upperBounds = ((WildcardType)t).getUpperBounds(); |
| if(upperBounds.length == 1) |
| { |
| return getRawType(upperBounds[0]); |
| } |
| } |
| throw new ServerScopedRuntimeException("Unable to process type when constructing configuration model: " + t); |
| } |
| |
| |
| String getAmqpName(final Class<? extends ConfiguredObject> type) |
| { |
| ManagedObject annotation = type.getAnnotation(ManagedObject.class); |
| |
| return "".equals(annotation.amqpName()) ? type.getName() : annotation.amqpName(); |
| } |
| |
| private void addManagedCategories(Class<? extends ConfiguredObject> category) |
| { |
| if(_managedCategories.add(category)) |
| { |
| for(Class<? extends ConfiguredObject> childClass : _model.getChildTypes(category)) |
| { |
| addManagedCategories(childClass); |
| } |
| |
| } |
| } |
| |
| @Override |
| public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, |
| final String routingAddress, |
| final InstanceProperties instanceProperties, |
| final ServerTransaction txn, |
| final Action<? super MessageInstance> postEnqueueAction) |
| { |
| if(message.isResourceAcceptable(this)) |
| { |
| @SuppressWarnings("unchecked") |
| MessageConverter<M, InternalMessage> converter = |
| MessageConverterRegistry.getConverter(((Class<M>) message.getClass()), InternalMessage.class); |
| |
| |
| if (converter != null) |
| { |
| final InternalMessage msg = converter.convert(message, _addressSpace); |
| txn.addPostTransactionAction(new ServerTransaction.Action() |
| { |
| @Override |
| public void postCommit() |
| { |
| enqueue(msg, instanceProperties, postEnqueueAction); |
| } |
| |
| @Override |
| public void onRollback() |
| { |
| |
| } |
| }); |
| |
| return 1; |
| } |
| else |
| { |
| return 0; |
| } |
| } |
| else |
| { |
| return 0; |
| } |
| } |
| |
| @Override |
| public boolean isDurable() |
| { |
| return true; |
| } |
| |
| @Override |
| public void linkAdded(final MessageSender sender, final PublishingLink link) |
| { |
| |
| } |
| |
| @Override |
| public void linkRemoved(final MessageSender sender, final PublishingLink link) |
| { |
| |
| } |
| |
| private synchronized void enqueue(InternalMessage message, |
| InstanceProperties properties, |
| Action<? super MessageInstance> postEnqueueAction) |
| { |
| if(postEnqueueAction != null) |
| { |
| postEnqueueAction.performAction(new ConsumedMessageInstance(message, properties)); |
| } |
| |
| |
| |
| String id = (String) message.getMessageHeader().getHeader(IDENTITY_ATTRIBUTE); |
| String type = (String) message.getMessageHeader().getHeader(TYPE_ATTRIBUTE); |
| String operation = (String) message.getMessageHeader().getHeader(OPERATION_HEADER); |
| |
| InternalMessage response; |
| |
| // TODO - handle runtime exceptions |
| |
| if(SELF_NODE_NAME.equals(id) && type.equals(MANAGEMENT_TYPE)) |
| { |
| response = performManagementOperation(operation, message); |
| } |
| else if(_managedTypes.containsKey(type)) |
| { |
| response = performOperation(_managedTypes.get(type), operation, message); |
| |
| } |
| else |
| { |
| response = createFailureResponse(message, |
| STATUS_CODE_NOT_FOUND, |
| "Unknown type {0}", type); |
| } |
| |
| sendResponse(message, response); |
| |
| } |
| |
| private interface StandardOperation |
| { |
| String getName(); |
| InternalMessage performOperation(final Class<? extends ConfiguredObject> clazz, |
| final InternalMessage message); |
| |
| } |
| |
| |
| private final StandardOperation CREATE_OPERATION = |
| new StandardOperation() |
| { |
| @Override |
| public String getName() |
| { |
| return "CREATE"; |
| } |
| |
| @Override |
| public InternalMessage performOperation(final Class<? extends ConfiguredObject> clazz, |
| final InternalMessage message) |
| { |
| return performCreateOperation(clazz, message); |
| } |
| }; |
| |
| |
| private final StandardOperation READ_OPERATION = |
| new StandardOperation() |
| { |
| @Override |
| public String getName() |
| { |
| return "READ"; |
| } |
| |
| @Override |
| public InternalMessage performOperation(final Class<? extends ConfiguredObject> clazz, |
| final InternalMessage message) |
| { |
| return performReadOperation(clazz, message); |
| } |
| }; |
| |
| |
| private final StandardOperation UPDATE_OPERATION = |
| new StandardOperation() |
| { |
| @Override |
| public String getName() |
| { |
| return "UPDATE"; |
| } |
| |
| @Override |
| public InternalMessage performOperation(final Class<? extends ConfiguredObject> clazz, |
| final InternalMessage message) |
| { |
| return performUpdateOperation(clazz, message); |
| } |
| }; |
| |
| |
| private final StandardOperation DELETE_OPERATION = |
| new StandardOperation() |
| { |
| @Override |
| public String getName() |
| { |
| return "DELETE"; |
| } |
| |
| @Override |
| public InternalMessage performOperation(final Class<? extends ConfiguredObject> clazz, |
| final InternalMessage message) |
| { |
| return performDeleteOperation(clazz, message); |
| } |
| }; |
| |
| |
| private InternalMessage performOperation(final Class<? extends ConfiguredObject> clazz, |
| final String operation, |
| InternalMessage message) |
| { |
| final Map<String, ConfiguredObjectOperation<?>> operations = _model.getTypeRegistry().getOperations(clazz); |
| @SuppressWarnings("unchecked") |
| final ConfiguredObjectOperation<ConfiguredObject<?>> method = |
| (ConfiguredObjectOperation<ConfiguredObject<?>>) operations.get(operation); |
| StandardOperation standardOperation; |
| try |
| { |
| if (method != null) |
| { |
| return performConfiguredObjectOperation(clazz, message, method); |
| } |
| else if ((standardOperation = _standardOperations.get(clazz).get(operation)) != null) |
| { |
| return standardOperation.performOperation(clazz, message); |
| } |
| else |
| { |
| return createFailureResponse(message, STATUS_CODE_NOT_IMPLEMENTED, "Not implemented"); |
| } |
| } |
| catch (RuntimeException e) |
| { |
| return createFailureResponse(message, STATUS_CODE_INTERNAL_ERROR, e.getMessage()); |
| } |
| } |
| |
| private InternalMessage performDeleteOperation(final Class<? extends ConfiguredObject> clazz, |
| final InternalMessage message) |
| { |
| InternalMessageHeader requestHeader = message.getMessageHeader(); |
| final Map<String, Object> headers = requestHeader.getHeaderMap(); |
| |
| ConfiguredObject<?> object = findObject(clazz, headers); |
| if(object != null) |
| { |
| try |
| { |
| object.delete(); |
| |
| final MutableMessageHeader responseHeader = new MutableMessageHeader(); |
| responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null |
| ? requestHeader.getMessageId() |
| : requestHeader.getCorrelationId()); |
| responseHeader.setMessageId(UUID.randomUUID().toString()); |
| responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_NO_CONTENT); |
| |
| return InternalMessage.createMapMessage(_addressSpace.getMessageStore(), |
| responseHeader, |
| Collections.emptyMap()); |
| } |
| catch (IntegrityViolationException e) |
| { |
| return createFailureResponse(message, STATUS_CODE_FORBIDDEN, e.getMessage()); |
| } |
| } |
| else |
| { |
| return createFailureResponse(message, STATUS_CODE_NOT_FOUND, "Not Found"); |
| } |
| } |
| |
| private InternalMessage performUpdateOperation(final Class<? extends ConfiguredObject> clazz, |
| final InternalMessage message) |
| { |
| InternalMessageHeader requestHeader = message.getMessageHeader(); |
| |
| final Map<String, Object> headers = requestHeader.getHeaderMap(); |
| |
| ConfiguredObject<?> object = findObject(clazz, headers); |
| if(object != null) |
| { |
| if(message.getMessageBody() instanceof Map) |
| { |
| @SuppressWarnings("unchecked") |
| final HashMap<String, Object> attributes = new HashMap<>((Map) message.getMessageBody()); |
| Object id = attributes.remove(IDENTITY_ATTRIBUTE); |
| if (id != null && !String.valueOf(id).equals(object.getId().toString())) |
| { |
| return createFailureResponse(message, |
| STATUS_CODE_FORBIDDEN, |
| "Cannot change the value of '" + IDENTITY_ATTRIBUTE + "'"); |
| } |
| String path = (String) attributes.remove(OBJECT_PATH); |
| Class<? extends ConfiguredObject> parentType = _model.getParentType(clazz); |
| if(parentType != null) |
| { |
| String attributeName = parentType.getSimpleName().toLowerCase(); |
| final Object parentValue = attributes.remove(attributeName); |
| if (parentValue != null && !String.valueOf(parentValue) |
| .equals(object.getParent().getName())) |
| { |
| return createFailureResponse(message, |
| STATUS_CODE_FORBIDDEN, |
| "Cannot change the value of '" + attributeName + "'"); |
| } |
| } |
| if (path != null && !attributes.containsKey(ConfiguredObject.NAME)) |
| { |
| String[] pathElements = getPathElements(path); |
| attributes.put(ConfiguredObject.NAME, pathElements[pathElements.length - 1]); |
| } |
| object.setAttributes(attributes); |
| |
| final MutableMessageHeader responseHeader = new MutableMessageHeader(); |
| responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null |
| ? requestHeader.getMessageId() |
| : requestHeader.getCorrelationId()); |
| responseHeader.setMessageId(UUID.randomUUID().toString()); |
| responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK); |
| |
| return InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader, |
| _managementOutputConverter.convertToOutput(object, true)); |
| } |
| else |
| { |
| return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "Message body must be a map"); |
| } |
| } |
| else |
| { |
| return createFailureResponse(message, STATUS_CODE_NOT_FOUND, "No such object"); |
| } |
| } |
| |
| private String[] getPathElements(final String path) |
| { |
| String[] pathElements = path.split("(?<!\\\\)" + Pattern.quote("/")); |
| for(int i = 0; i<pathElements.length; i++) |
| { |
| pathElements[i] = pathElements[i].replaceAll("\\\\(.)","$1"); |
| } |
| return pathElements; |
| } |
| |
| |
| private InternalMessage performReadOperation(final Class<? extends ConfiguredObject> clazz, |
| final InternalMessage message) |
| { |
| InternalMessageHeader requestHeader = message.getMessageHeader(); |
| |
| final Map<String, Object> headers = requestHeader.getHeaderMap(); |
| final boolean actuals = headers.get(ACTUALS_ATTRIBUTE) == null || Boolean.parseBoolean(String.valueOf(headers.get(ACTUALS_ATTRIBUTE))); |
| |
| ConfiguredObject<?> object = findObject(clazz, headers); |
| if(object != null) |
| { |
| final MutableMessageHeader responseHeader = new MutableMessageHeader(); |
| responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null |
| ? requestHeader.getMessageId() |
| : requestHeader.getCorrelationId()); |
| responseHeader.setMessageId(UUID.randomUUID().toString()); |
| responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK); |
| |
| // TODO - remove insecure on insecure channel |
| |
| return InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader, |
| _managementOutputConverter.convertToOutput(object, actuals)); |
| } |
| else |
| { |
| return createFailureResponse(message, |
| STATUS_CODE_NOT_FOUND, |
| "Not found"); |
| } |
| } |
| |
| private InternalMessage performCreateOperation(final Class<? extends ConfiguredObject> clazz, |
| final InternalMessage message) |
| { |
| InternalMessageHeader requestHeader = message.getMessageHeader(); |
| |
| final MutableMessageHeader responseHeader = new MutableMessageHeader(); |
| responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null |
| ? requestHeader.getMessageId() |
| : requestHeader.getCorrelationId()); |
| responseHeader.setMessageId(UUID.randomUUID().toString()); |
| responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_CREATED); |
| |
| if(message.getMessageBody() instanceof Map) |
| { |
| @SuppressWarnings("unchecked") |
| Map<String,Object> attributes = (Map<String,Object>) message.getMessageBody(); |
| if(attributes.containsKey(IDENTITY_ATTRIBUTE)) |
| { |
| return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "The '"+IDENTITY_ATTRIBUTE+"' cannot be set when creating an object"); |
| } |
| if(attributes.containsKey(ConfiguredObject.ID)) |
| { |
| return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "The '"+ConfiguredObject.ID+"' cannot be set when creating an object"); |
| } |
| if(!attributes.containsKey(QPID_TYPE) && ConfiguredObjectTypeRegistry.getCategory(clazz) != clazz) |
| { |
| Class<? extends ConfiguredObject> typeClass = _model.getTypeRegistry().getTypeClass(clazz); |
| String type = typeClass.getAnnotation(ManagedObject.class).type(); |
| if(!"".equals(type)) |
| { |
| attributes.put(QPID_TYPE, type); |
| } |
| } |
| |
| if(attributes.containsKey(OBJECT_PATH)) |
| { |
| String path = String.valueOf(attributes.remove(OBJECT_PATH)); |
| |
| ConfiguredObject theParent = _managedObject; |
| |
| final Class<? extends ConfiguredObject>[] hierarchy = _configuredObjectFinder.getHierarchy(clazz); |
| if (hierarchy.length > 1) |
| { |
| |
| ConfiguredObject parent = |
| _configuredObjectFinder.findObjectParentsFromPath(Arrays.asList(getPathElements(path)), hierarchy, ConfiguredObjectTypeRegistry.getCategory(clazz)); |
| if(parent == null) |
| { |
| return createFailureResponse(message, STATUS_CODE_NOT_FOUND, "The '"+OBJECT_PATH+"' "+path+" does not identify a valid parent"); |
| } |
| theParent = parent; |
| } |
| return doCreate(clazz, message, responseHeader, attributes, theParent); |
| |
| } |
| else if(_configuredObjectFinder.getHierarchy(clazz).length == 1 && attributes.containsKey(ConfiguredObject.NAME)) |
| { |
| return doCreate(clazz, message, responseHeader, attributes, _managedObject); |
| } |
| else |
| { |
| return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "The '"+OBJECT_PATH+"' must be supplied"); |
| } |
| } |
| else |
| { |
| return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "Message body must be a map"); |
| } |
| |
| } |
| |
| private InternalMessage doCreate(final Class<? extends ConfiguredObject> clazz, |
| final InternalMessage message, |
| final MutableMessageHeader responseHeader, |
| final Map<String, Object> attributes, |
| final ConfiguredObject<?> parent) |
| { |
| try |
| { |
| ManagedObject annotation = clazz.getAnnotation(ManagedObject.class); |
| if(!annotation.category() || !"".equals(annotation.defaultType()) || attributes.containsKey(QPID_TYPE) || _model.getTypeRegistry().getTypeSpecialisations(clazz).size()==1) |
| { |
| if(attributes.containsKey(QPID_TYPE)) |
| { |
| attributes.put(ConfiguredObject.TYPE, attributes.remove(QPID_TYPE)); |
| } |
| else |
| { |
| attributes.remove(TYPE_ATTRIBUTE); |
| } |
| |
| |
| final ConfiguredObject object = parent.createChild(ConfiguredObjectTypeRegistry.getCategory(clazz), attributes); |
| return InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader, |
| _managementOutputConverter.convertToOutput(object, true)); |
| } |
| else |
| { |
| return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "type: '"+getAmqpName(clazz)+"' requires the '"+QPID_TYPE+"' attribute"); |
| } |
| } |
| catch (AbstractConfiguredObject.DuplicateNameException e) |
| { |
| return createFailureResponse(message, STATUS_CODE_CONFLICT, "Object already exists with the same path"); |
| |
| } |
| catch (IllegalArgumentException | IllegalStateException | IllegalConfigurationException e) |
| { |
| return createFailureResponse(message, STATUS_CODE_BAD_REQUEST, e.getMessage()); |
| } |
| catch (AccessControlException e) |
| { |
| return createFailureResponse(message, STATUS_CODE_FORBIDDEN, "Forbidden"); |
| } |
| } |
| |
| private InternalMessage performConfiguredObjectOperation(final Class<? extends ConfiguredObject> clazz, |
| final InternalMessage message, |
| final ConfiguredObjectOperation<ConfiguredObject<?>> method) |
| { |
| InternalMessageHeader requestHeader = message.getMessageHeader(); |
| |
| final Map<String, Object> headers = requestHeader.getHeaderMap(); |
| |
| ConfiguredObject<?> object = findObject(clazz, headers); |
| Map<String,Object> parameters = new HashMap<>(headers); |
| parameters.remove(KEY_ATTRIBUTE); |
| parameters.remove(IDENTITY_ATTRIBUTE); |
| parameters.remove(TYPE_ATTRIBUTE); |
| parameters.remove(INDEX_ATTRIBUTE); |
| parameters.remove(OPERATION_HEADER); |
| |
| Iterator<String> paramIterator = parameters.keySet().iterator(); |
| while (paramIterator.hasNext()) |
| { |
| final String paramName = paramIterator.next(); |
| if(paramName.startsWith("JMS_QPID")) |
| { |
| paramIterator.remove(); |
| } |
| |
| } |
| final MutableMessageHeader responseHeader = new MutableMessageHeader(); |
| responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null |
| ? requestHeader.getMessageId() |
| : requestHeader.getCorrelationId()); |
| responseHeader.setMessageId(UUID.randomUUID().toString()); |
| responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK); |
| |
| Serializable result = (Serializable) method.perform(object, parameters); |
| if(result == null) |
| { |
| result = new byte[0]; |
| } |
| return InternalMessage.createMessage(_addressSpace.getMessageStore(), responseHeader, |
| result, false); |
| } |
| |
| String generatePath(final ConfiguredObject<?> object) |
| { |
| return _configuredObjectFinder.getPath(object); |
| } |
| |
| private ConfiguredObject<?> findObject(final Class<? extends ConfiguredObject> clazz, |
| final Map<String, Object> headers) |
| { |
| if(headers.containsKey(IDENTITY_ATTRIBUTE)) |
| { |
| Object value = headers.get(IDENTITY_ATTRIBUTE); |
| UUID id; |
| if(value instanceof UUID) |
| { |
| id= (UUID) value; |
| } |
| else if(value instanceof String) |
| { |
| id = UUID.fromString((String) value); |
| } |
| else |
| { |
| return null; |
| } |
| |
| return findObjectById(id, clazz); |
| } |
| else if(headers.containsKey(INDEX_ATTRIBUTE)) |
| { |
| Object index = headers.get(INDEX_ATTRIBUTE); |
| if(OBJECT_PATH.equals(index)) |
| { |
| return _configuredObjectFinder.findObjectFromPath(String.valueOf(headers.get(KEY_ATTRIBUTE)), clazz); |
| } |
| else |
| { |
| throw new IllegalArgumentException("Unknown index: '"+index+'"'); |
| } |
| } |
| else |
| { |
| throw new IllegalArgumentException("Either "+IDENTITY_ATTRIBUTE+" or "+INDEX_ATTRIBUTE+" must be specified"); |
| |
| } |
| } |
| |
| private ConfiguredObject<?> findObjectById(final UUID id, final Class<? extends ConfiguredObject> clazz) |
| { |
| Collection<Class<? extends ConfiguredObject>> ancestorCategories = _model.getAncestorCategories(clazz); |
| if(ancestorCategories.contains(_managedObject.getCategoryClass())) |
| { |
| return findDescendantById(clazz, id, _managedObject.getCategoryClass(), Collections.singleton(_managedObject)); |
| } |
| else |
| { |
| for(Map.Entry<Class<? extends ConfiguredObject>,ConfiguredObjectOperation<?>> entry : _associatedChildrenOperations.entrySet()) |
| { |
| if(ancestorCategories.contains(entry.getKey())) |
| { |
| @SuppressWarnings("unchecked") |
| ConfiguredObjectOperation<ConfiguredObject<?>> operation = |
| (ConfiguredObjectOperation<ConfiguredObject<?>>) entry.getValue(); |
| @SuppressWarnings("unchecked") |
| Collection<? extends ConfiguredObject> associated = |
| (Collection<? extends ConfiguredObject>) operation |
| .perform(_managedObject, |
| Collections.<String, Object>emptyMap()); |
| ConfiguredObject<?> object = findDescendantById(clazz, id, |
| entry.getKey(), |
| associated); |
| if(object != null) |
| { |
| return object; |
| } |
| } |
| } |
| } |
| return null; |
| } |
| |
| private ConfiguredObject<?> findDescendantById(final Class<? extends ConfiguredObject> category, |
| final UUID id, |
| final Class<? extends ConfiguredObject> rootCategory, |
| final Collection<? extends ConfiguredObject> roots) |
| { |
| if(category == rootCategory) |
| { |
| for(ConfiguredObject<?> root : roots) |
| { |
| if(root.getId().equals(id)) |
| { |
| return root; |
| } |
| } |
| } |
| else |
| { |
| if(_model.getChildTypes(rootCategory).contains(category)) |
| { |
| for(ConfiguredObject<?> root : roots) |
| { |
| final ConfiguredObject<?> child = root.getChildById(category, id); |
| if(child != null) |
| { |
| return child; |
| } |
| } |
| } |
| else |
| { |
| Collection<Class<? extends ConfiguredObject>> ancestorCategories = _model.getAncestorCategories(category); |
| for(Class<? extends ConfiguredObject> childClass : _model.getChildTypes(rootCategory)) |
| { |
| if(ancestorCategories.contains(childClass)) |
| { |
| List<ConfiguredObject> newRoots = new ArrayList<>(); |
| for(ConfiguredObject<?> root : roots) |
| { |
| newRoots.addAll(root.getChildren(childClass)); |
| } |
| if(!newRoots.isEmpty()) |
| { |
| final ConfiguredObject<?> child = findDescendantById(category, id, childClass, newRoots); |
| if(child != null) |
| { |
| return child; |
| } |
| } |
| } |
| } |
| } |
| } |
| return null; |
| } |
| |
| private void sendResponse(final InternalMessage message, final InternalMessage response) |
| { |
| String replyTo = message.getMessageHeader().getReplyTo(); |
| response.setInitialRoutingAddress(replyTo); |
| |
| |
| getResponseDestination(replyTo).send(response, |
| replyTo, InstanceProperties.EMPTY, |
| new AutoCommitTransaction(_addressSpace.getMessageStore()), |
| null); |
| |
| } |
| |
| private MessageDestination getResponseDestination(String replyTo) |
| { |
| ManagementNodeConsumer consumer = null; |
| Subject currentSubject = Subject.getSubject(AccessController.getContext()); |
| Set<SessionPrincipal> sessionPrincipals = currentSubject.getPrincipals(SessionPrincipal.class); |
| if (!sessionPrincipals.isEmpty()) |
| { |
| AMQSessionModel publishingSession = sessionPrincipals.iterator().next().getSession(); |
| for (ManagementNodeConsumer candidate : _consumers) |
| { |
| if (candidate.getTarget().getTargetAddress().equals(replyTo) && candidate.getSessionModel().getConnectionReference() == publishingSession.getConnectionReference()) |
| { |
| consumer = candidate; |
| break; |
| } |
| } |
| } |
| return consumer == null ? _addressSpace.getDefaultDestination() : consumer; |
| } |
| |
| |
| private InternalMessage createFailureResponse(final InternalMessage requestMessage, |
| final int statusCode, |
| final String stateDescription, |
| Object... params) |
| { |
| final InternalMessageHeader requestHeader = requestMessage.getMessageHeader(); |
| final MutableMessageHeader responseHeader = new MutableMessageHeader(); |
| responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null |
| ? requestHeader.getMessageId() |
| : requestHeader.getCorrelationId()); |
| responseHeader.setMessageId(UUID.randomUUID().toString()); |
| for(String header : requestHeader.getHeaderNames()) |
| { |
| responseHeader.setHeader(header, requestHeader.getHeader(header)); |
| } |
| responseHeader.setHeader(STATUS_CODE_HEADER, statusCode); |
| responseHeader.setHeader(STATUS_DESCRIPTION_HEADER, params.length == 0 ? stateDescription : MessageFormat.format(stateDescription, params)); |
| return InternalMessage.createBytesMessage(_addressSpace.getMessageStore(), responseHeader, new byte[0]); |
| |
| } |
| |
| private InternalMessage performManagementOperation(String operation, final InternalMessage msg) |
| { |
| final InternalMessage responseMessage; |
| final InternalMessageHeader requestHeader = msg.getMessageHeader(); |
| final MutableMessageHeader responseHeader = new MutableMessageHeader(); |
| responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null |
| ? requestHeader.getMessageId() |
| : requestHeader.getCorrelationId()); |
| responseHeader.setMessageId(UUID.randomUUID().toString()); |
| |
| Map<?, ?> result; |
| if(GET_TYPES.equals(operation)) |
| { |
| result = performGetTypes(requestHeader.getHeaderMap()); |
| } |
| else if(GET_ATTRIBUTES.equals(operation)) |
| { |
| result = performGetAttributes(requestHeader.getHeaderMap()); |
| } |
| else if(GET_OPERATIONS.equals(operation)) |
| { |
| result = performGetOperations(requestHeader.getHeaderMap()); |
| } |
| else if(QUERY.equals(operation)) |
| { |
| if(msg.getMessageBody() instanceof Map) |
| { |
| result = performQuery(requestHeader.getHeaderMap(), (Map)(msg.getMessageBody())); |
| } |
| else |
| { |
| return createFailureResponse(msg, STATUS_CODE_BAD_REQUEST, "Body of a QUERY operation must be a map"); |
| } |
| } |
| else |
| { |
| return createFailureResponse(msg, STATUS_CODE_NOT_IMPLEMENTED, "Unknown operation {}", operation); |
| } |
| responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK); |
| |
| responseMessage = InternalMessage.createMapMessage(_addressSpace.getMessageStore(), responseHeader, result); |
| |
| return responseMessage; |
| } |
| |
| private Map<?, ?> performQuery(final Map<String, Object> headerMap, final Map messageBody) |
| { |
| @SuppressWarnings("unchecked") |
| List<Object> attributeNameObjects = (List<Object>)_managementInputConverter.convert(List.class, messageBody.get(ATTRIBUTE_NAMES)); |
| List<String> attributeNames; |
| if(attributeNameObjects == null) |
| { |
| attributeNames = Collections.emptyList(); |
| } |
| else |
| { |
| attributeNames = new ArrayList<>(attributeNameObjects.size()); |
| for(Object nameObject : attributeNameObjects) |
| { |
| if(nameObject == null) |
| { |
| throw new IllegalArgumentException("All attribute names must be non-null"); |
| } |
| else |
| { |
| attributeNames.add(nameObject.toString()); |
| } |
| } |
| } |
| |
| String entityType = (String)headerMap.get(ENTITY_TYPE_HEADER); |
| |
| if(attributeNames.isEmpty()) |
| { |
| attributeNames = generateAttributeNames(entityType); |
| } |
| |
| List<ConfiguredObject<?>> objects = getObjects(entityType); |
| Collections.sort(objects, OBJECT_COMPARATOR); |
| if(headerMap.containsKey(OFFSET_HEADER)) |
| { |
| int offset; |
| if(headerMap.get(OFFSET_HEADER) instanceof Number) |
| { |
| offset = ((Number) headerMap.get(OFFSET_HEADER)).intValue(); |
| } |
| else |
| { |
| offset = Integer.parseInt(headerMap.get(OFFSET_HEADER).toString()); |
| } |
| if(offset >= 0) |
| { |
| if(offset < objects.size()) |
| { |
| objects = objects.subList(offset, objects.size()); |
| } |
| else |
| { |
| objects = Collections.emptyList(); |
| } |
| } |
| else if(objects.size() + offset > 0) |
| { |
| objects = objects.subList(objects.size()+offset, objects.size()); |
| } |
| } |
| |
| if(headerMap.containsKey(COUNT_HEADER)) |
| { |
| int count; |
| if(headerMap.get(COUNT_HEADER) instanceof Number) |
| { |
| count = ((Number) headerMap.get(COUNT_HEADER)).intValue(); |
| } |
| else |
| { |
| count = Integer.parseInt(headerMap.get(OFFSET_HEADER).toString()); |
| } |
| if(count >= 0) |
| { |
| if(count < objects.size()) |
| { |
| objects = objects.subList(0, count); |
| } |
| else |
| { |
| objects = Collections.emptyList(); |
| } |
| } |
| else if(objects.size() + count > 0) |
| { |
| objects = objects.subList(0, objects.size()+count); |
| } |
| } |
| List<List<Object>> resultList = new ArrayList<>(objects.size()); |
| |
| for(ConfiguredObject<?> object : objects) |
| { |
| List<Object> attributes = new ArrayList<>(attributeNames.size()); |
| Map<?,?> convertedObject = _managementOutputConverter.convertToOutput(object, true); |
| for(String attributeName : attributeNames) |
| { |
| attributes.add(convertedObject.get(attributeName)); |
| } |
| resultList.add(attributes); |
| } |
| Map<Object, Object> result = new LinkedHashMap<>(); |
| result.put(ATTRIBUTE_NAMES, attributeNames); |
| result.put(RESULTS, resultList); |
| return result; |
| } |
| |
| private Collection<ConfiguredObject<?>> getChildrenOfType(ConfiguredObject<?> object, Class<? extends ConfiguredObject> type) |
| { |
| Set<ConfiguredObject<?>> children = new HashSet<>(); |
| Class<? extends ConfiguredObject> categoryClass = ConfiguredObjectTypeRegistry.getCategory(type); |
| for(Class<? extends ConfiguredObject> childClass : _model.getChildTypes(object.getCategoryClass())) |
| { |
| if(childClass == categoryClass) |
| { |
| for (ConfiguredObject<?> child : object.getChildren(childClass)) |
| { |
| if(categoryClass == type || child.getTypeClass() == type) |
| { |
| children.add(child); |
| } |
| } |
| } |
| else |
| { |
| if(_model.getAncestorCategories(categoryClass).contains(childClass)) |
| { |
| for(ConfiguredObject<?> child : object.getChildren(childClass)) |
| { |
| children.addAll(getChildrenOfType(child, type)); |
| } |
| } |
| } |
| } |
| return children; |
| } |
| |
| private List<ConfiguredObject<?>> getObjects(final String entityType) |
| { |
| Set<ConfiguredObject<?>> foundObjects; |
| |
| if(entityType == null) |
| { |
| foundObjects = findAllChildren(); |
| |
| } |
| else |
| { |
| final Class<? extends ConfiguredObject> type = _managedTypes.get(entityType); |
| if(type != null) |
| { |
| foundObjects = new HashSet<>(); |
| Collection<Class<? extends ConfiguredObject>> ancestorCategories = |
| _model.getAncestorCategories(ConfiguredObjectTypeRegistry.getCategory(type)); |
| if(ancestorCategories.contains(_managedObject.getCategoryClass())) |
| { |
| foundObjects.addAll(getChildrenOfType(_managedObject, type)); |
| } |
| |
| for(Map.Entry<Class<? extends ConfiguredObject>, ConfiguredObjectOperation<?>> entry : _associatedChildrenOperations.entrySet()) |
| { |
| if(ancestorCategories.contains(entry.getKey())) |
| { |
| @SuppressWarnings("unchecked") |
| final Collection<ConfiguredObject<?>> parents = getAssociatedChildren(entry.getValue(), _managedObject); |
| |
| for(ConfiguredObject<?> parent : parents) |
| { |
| foundObjects.addAll(getChildrenOfType(parent, type)); |
| } |
| } |
| } |
| } |
| else |
| { |
| throw new IllegalArgumentException("Unknown entity type: '"+entityType+"'"); |
| } |
| } |
| // TODO - get the objects |
| |
| return new ArrayList<>(foundObjects); |
| } |
| |
| private Set<ConfiguredObject<?>> findAllChildren() |
| { |
| final Set<ConfiguredObject<?>> foundObjects; |
| foundObjects = new HashSet<>(); |
| Set<ConfiguredObject<?>> parents = new HashSet<>(); |
| Set<ConfiguredObject<?>> children; |
| |
| parents.add(_managedObject); |
| for(ConfiguredObjectOperation op : _associatedChildrenOperations.values()) |
| { |
| |
| @SuppressWarnings("unchecked") |
| final Collection<ConfiguredObject<?>> associated = getAssociatedChildren(op, _managedObject); |
| |
| parents.addAll(associated); |
| } |
| foundObjects.addAll(parents); |
| do |
| { |
| children = new HashSet<>(); |
| for(ConfiguredObject<?> parent : parents) |
| { |
| for(Class<? extends ConfiguredObject> childClass : _model.getChildTypes(parent.getCategoryClass())) |
| { |
| children.addAll((Collection<? extends ConfiguredObject<?>>) parent.getChildren(childClass)); |
| } |
| } |
| parents = children; |
| |
| |
| } |
| while(foundObjects.addAll(parents)); |
| return foundObjects; |
| } |
| |
| private static <C extends ConfiguredObject<?>> Collection<ConfiguredObject<?>> getAssociatedChildren(final ConfiguredObjectOperation<C> op, final ConfiguredObject<?> managedObject) |
| { |
| @SuppressWarnings("unchecked") |
| final Collection<ConfiguredObject<?>> associated = (Collection<ConfiguredObject<?>>) op.perform((C)managedObject, Collections.<String, Object>emptyMap()); |
| return associated; |
| } |
| |
| private List<String> generateAttributeNames(String entityType) |
| { |
| Set<String> attrNameSet = new HashSet<>(); |
| List<String> attributeNames = new ArrayList<>(); |
| for(String standardAttribute : Arrays.asList(IDENTITY_ATTRIBUTE, TYPE_ATTRIBUTE, QPID_TYPE, OBJECT_PATH)) |
| { |
| attrNameSet.add(standardAttribute); |
| attributeNames.add(standardAttribute); |
| } |
| final ConfiguredObjectTypeRegistry typeRegistry = _model.getTypeRegistry(); |
| List<Class<? extends ConfiguredObject>> classes = new ArrayList<>(); |
| |
| if(entityType != null && !entityType.trim().equals("")) |
| { |
| Class<? extends ConfiguredObject> clazz = _managedTypes.get(entityType); |
| if(clazz != null) |
| { |
| classes.add(clazz); |
| if(ConfiguredObjectTypeRegistry.getCategory(clazz) == clazz) |
| { |
| classes.addAll(_model.getTypeRegistry().getTypeSpecialisations(clazz)); |
| } |
| } |
| } |
| else |
| { |
| for (Class<? extends ConfiguredObject> clazz : _managedCategories) |
| { |
| classes.add(clazz); |
| classes.addAll(_model.getTypeRegistry().getTypeSpecialisations(clazz)); |
| } |
| } |
| for(Class<? extends ConfiguredObject> clazz : classes) |
| { |
| for(String name : typeRegistry.getAttributeNames(clazz)) |
| { |
| if(!ConfiguredObject.ID.equals(name) && attrNameSet.add(name)) |
| { |
| attributeNames.add(name); |
| } |
| } |
| final Class<? extends ConfiguredObject> category = ConfiguredObjectTypeRegistry.getCategory(clazz); |
| if(category != _managedObject.getCategoryClass() |
| && !isSyntheticChildClass(category)) |
| { |
| Class<? extends ConfiguredObject> parentType = _model.getParentType(category); |
| if (parentType != _managedObject.getCategoryClass()) |
| { |
| attributeNames.add(parentType.getSimpleName().toLowerCase()); |
| } |
| } |
| |
| } |
| |
| return attributeNames; |
| } |
| |
| private <T> Map<String, T> performManagementOperation(final Map<String,Object> requestHeader, TypeOperation<T> operation, T selfValue) |
| { |
| Map<String,T> responseMap = new LinkedHashMap<>(); |
| |
| if(requestHeader.containsKey(ENTITY_TYPE_HEADER)) |
| { |
| String entityType = (String)requestHeader.get(ENTITY_TYPE_HEADER); |
| Class<? extends ConfiguredObject> clazz = _managedTypes.get(entityType); |
| if(clazz != null) |
| { |
| responseMap.put(entityType, operation.evaluateType(clazz)); |
| if(ConfiguredObjectTypeRegistry.getCategory(clazz) == clazz) |
| { |
| for(Class<? extends ConfiguredObject> type : _model.getTypeRegistry().getTypeSpecialisations(clazz)) |
| { |
| if(type.isAnnotationPresent(ManagedObject.class)) |
| { |
| responseMap.put(getAmqpName(type), operation.evaluateType(type)); |
| } |
| } |
| } |
| } |
| else if(MANAGEMENT_TYPE.equals(entityType)) |
| { |
| responseMap.put(entityType, selfValue); |
| } |
| } |
| else |
| { |
| |
| for(Map.Entry<String, Class<? extends ConfiguredObject>> entry : _managedTypes.entrySet()) |
| { |
| responseMap.put(entry.getKey(), operation.evaluateType(entry.getValue())); |
| } |
| responseMap.put(MANAGEMENT_TYPE, selfValue); |
| } |
| return responseMap; |
| } |
| |
| private Map<String,List<String>> performGetTypes(final Map<String, Object> header) |
| { |
| return performManagementOperation(header, |
| new TypeOperation<List<String>>() |
| { |
| @Override |
| public List<String> evaluateType(final Class<? extends ConfiguredObject> clazz) |
| { |
| Class<? extends ConfiguredObject> category = |
| ConfiguredObjectTypeRegistry.getCategory(clazz); |
| if(category == clazz) |
| { |
| return Collections.emptyList(); |
| } |
| else |
| { |
| return Collections.singletonList(getAmqpName(category)); |
| } |
| } |
| }, Collections.<String>emptyList()); |
| |
| } |
| |
| private Map<String,List<String>> performGetAttributes(final Map<String, Object> headers) |
| { |
| return performManagementOperation(headers, |
| new TypeOperation<List<String>>() |
| { |
| @Override |
| public List<String> evaluateType(final Class<? extends ConfiguredObject> clazz) |
| { |
| return new ArrayList<>(_model.getTypeRegistry().getAttributeNames(clazz)); |
| } |
| }, Collections.<String>emptyList()); |
| |
| } |
| |
| |
| private Map<String,Map<String,List<String>>> performGetOperations(final Map<String, Object> headers) |
| { |
| // TODO - enumerate management operations |
| final Map<String, List<String>> managementOperations = new HashMap<>(); |
| |
| return performManagementOperation(headers, |
| new TypeOperation<Map<String,List<String>>>() |
| { |
| @Override |
| public Map<String,List<String>> evaluateType(final Class<? extends ConfiguredObject> clazz) |
| { |
| final Map<String, ConfiguredObjectOperation<?>> operations = |
| _model.getTypeRegistry().getOperations(clazz); |
| Map<String,List<String>> result = new HashMap<>(); |
| for(Map.Entry<String, ConfiguredObjectOperation<?>> operation : operations.entrySet()) |
| { |
| |
| List<String> arguments = new ArrayList<>(); |
| for(OperationParameter param : operation.getValue().getParameters()) |
| { |
| arguments.add(param.getName()); |
| } |
| result.put(operation.getKey(), arguments); |
| } |
| return result; |
| } |
| }, managementOperations); |
| |
| } |
| |
| |
| @Override |
| public synchronized <T extends ConsumerTarget<T>> ManagementNodeConsumer<T> addConsumer(final T target, |
| final FilterManager filters, |
| final Class<? extends ServerMessage> messageClass, |
| final String consumerName, |
| final EnumSet<ConsumerOption> options, |
| final Integer priority) |
| { |
| |
| final ManagementNodeConsumer<T> managementNodeConsumer = new ManagementNodeConsumer<>(consumerName,this, target); |
| target.consumerAdded(managementNodeConsumer); |
| _consumers.add(managementNodeConsumer); |
| return managementNodeConsumer; |
| } |
| |
| @Override |
| public synchronized Collection<ManagementNodeConsumer> getConsumers() |
| { |
| return Collections.unmodifiableCollection(_consumers); |
| } |
| |
| @Override |
| public boolean verifySessionAccess(final AMQSessionModel<?,?> session) |
| { |
| return true; |
| } |
| |
| @Override |
| public NamedAddressSpace getAddressSpace() |
| { |
| return _addressSpace; |
| } |
| |
| |
| @Override |
| public void authorisePublish(final SecurityToken token, final Map<String, Object> arguments) |
| throws AccessControlException |
| { |
| // ? special permissions to publish to the management node |
| } |
| |
| @Override |
| public String getName() |
| { |
| return MANAGEMENT_NODE_NAME; |
| } |
| |
| @Override |
| public UUID getId() |
| { |
| return _id; |
| } |
| |
| @Override |
| public MessageDurability getMessageDurability() |
| { |
| return MessageDurability.NEVER; |
| } |
| |
| void unregisterConsumer(ManagementNodeConsumer managementNodeConsumer) |
| { |
| _consumers.remove(managementNodeConsumer); |
| } |
| |
| private class ConsumedMessageInstance implements MessageInstance |
| { |
| private final ServerMessage _message; |
| private final InstanceProperties _properties; |
| |
| ConsumedMessageInstance(final ServerMessage message, |
| final InstanceProperties properties) |
| { |
| _message = message; |
| _properties = properties; |
| } |
| |
| @Override |
| public int getDeliveryCount() |
| { |
| return 0; |
| } |
| |
| @Override |
| public void incrementDeliveryCount() |
| { |
| |
| } |
| |
| @Override |
| public void decrementDeliveryCount() |
| { |
| |
| } |
| |
| @Override |
| public void addStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener) |
| { |
| |
| } |
| |
| @Override |
| public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener) |
| { |
| return false; |
| } |
| |
| |
| @Override |
| public boolean acquiredByConsumer() |
| { |
| return false; |
| } |
| |
| @Override |
| public MessageInstanceConsumer getAcquiringConsumer() |
| { |
| return null; |
| } |
| |
| @Override |
| public MessageEnqueueRecord getEnqueueRecord() |
| { |
| return null; |
| } |
| |
| @Override |
| public boolean isAcquiredBy(final MessageInstanceConsumer consumer) |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean removeAcquisitionFromConsumer(final MessageInstanceConsumer consumer) |
| { |
| return false; |
| } |
| |
| @Override |
| public void setRedelivered() |
| { |
| |
| } |
| |
| @Override |
| public boolean isRedelivered() |
| { |
| return false; |
| } |
| |
| @Override |
| public void reject() |
| { |
| |
| } |
| |
| @Override |
| public boolean isRejectedBy(final MessageInstanceConsumer consumer) |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean getDeliveredToConsumer() |
| { |
| return true; |
| } |
| |
| @Override |
| public boolean expired() |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean acquire(final MessageInstanceConsumer sub) |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer) |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean makeAcquisitionStealable() |
| { |
| return false; |
| } |
| |
| @Override |
| public int getMaximumDeliveryCount() |
| { |
| return 0; |
| } |
| |
| @Override |
| public int routeToAlternate(final Action<? super MessageInstance> action, |
| final ServerTransaction txn) |
| { |
| return 0; |
| } |
| |
| |
| @Override |
| public Filterable asFilterable() |
| { |
| return null; |
| } |
| |
| @Override |
| public boolean isAvailable() |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean acquire() |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean isAcquired() |
| { |
| return false; |
| } |
| |
| @Override |
| public void release() |
| { |
| |
| } |
| |
| @Override |
| public void release(final MessageInstanceConsumer release) |
| { |
| |
| } |
| |
| @Override |
| public void delete() |
| { |
| |
| } |
| |
| @Override |
| public boolean isDeleted() |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean isHeld() |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean isPersistent() |
| { |
| return false; |
| } |
| |
| @Override |
| public ServerMessage getMessage() |
| { |
| return _message; |
| } |
| |
| @Override |
| public InstanceProperties getInstanceProperties() |
| { |
| return _properties; |
| } |
| |
| @Override |
| public TransactionLogResource getOwningResource() |
| { |
| return ManagementNode.this; |
| } |
| } |
| |
| private static class MutableMessageHeader implements AMQMessageHeader |
| { |
| private final LinkedHashMap<String, Object> _headers = new LinkedHashMap<>(); |
| private String _correlationId; |
| private String _messageId; |
| |
| void setCorrelationId(final String correlationId) |
| { |
| _correlationId = correlationId; |
| } |
| |
| void setMessageId(final String messageId) |
| { |
| _messageId = messageId; |
| } |
| |
| @Override |
| public String getCorrelationId() |
| { |
| return _correlationId; |
| } |
| |
| @Override |
| public long getExpiration() |
| { |
| return 0L; |
| } |
| |
| @Override |
| public String getUserId() |
| { |
| return null; |
| } |
| |
| @Override |
| public String getAppId() |
| { |
| return null; |
| } |
| |
| @Override |
| public String getMessageId() |
| { |
| return _messageId; |
| } |
| |
| @Override |
| public String getMimeType() |
| { |
| return null; |
| } |
| |
| @Override |
| public String getEncoding() |
| { |
| return null; |
| } |
| |
| @Override |
| public byte getPriority() |
| { |
| return 4; |
| } |
| |
| @Override |
| public long getTimestamp() |
| { |
| return 0L; |
| } |
| |
| @Override |
| public long getNotValidBefore() |
| { |
| return 0L; |
| } |
| |
| @Override |
| public String getType() |
| { |
| return null; |
| } |
| |
| @Override |
| public String getReplyTo() |
| { |
| return null; |
| } |
| |
| @Override |
| public Object getHeader(final String name) |
| { |
| return _headers.get(name); |
| } |
| |
| @Override |
| public boolean containsHeaders(final Set<String> names) |
| { |
| return _headers.keySet().containsAll(names); |
| } |
| |
| @Override |
| public boolean containsHeader(final String name) |
| { |
| return _headers.containsKey(name); |
| } |
| |
| @Override |
| public Collection<String> getHeaderNames() |
| { |
| return Collections.unmodifiableCollection(_headers.keySet()); |
| } |
| |
| void setHeader(String header, Object value) |
| { |
| _headers.put(header,value); |
| } |
| |
| } |
| |
| private interface TypeOperation<T> |
| { |
| T evaluateType(Class<? extends ConfiguredObject> operation); |
| } |
| |
| } |