| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.management.wsdm.capabilities; |
| |
| import java.lang.management.ManagementFactory; |
| import java.lang.reflect.Method; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.management.InstanceNotFoundException; |
| import javax.management.MBeanServer; |
| import javax.management.Notification; |
| import javax.management.NotificationFilter; |
| import javax.management.NotificationListener; |
| import javax.management.ObjectName; |
| import javax.xml.namespace.QName; |
| |
| import org.apache.muse.core.AbstractCapability; |
| import org.apache.muse.core.Resource; |
| import org.apache.muse.core.ResourceManager; |
| import org.apache.muse.core.routing.MessageHandler; |
| import org.apache.muse.core.serializer.SerializerRegistry; |
| import org.apache.muse.ws.addressing.EndpointReference; |
| import org.apache.muse.ws.addressing.soap.SoapFault; |
| import org.apache.muse.ws.notification.NotificationProducer; |
| import org.apache.muse.ws.notification.WsnConstants; |
| import org.apache.qpid.management.Messages; |
| import org.apache.qpid.management.Names; |
| import org.apache.qpid.management.configuration.Configuration; |
| import org.apache.qpid.management.jmx.EntityLifecycleNotification; |
| import org.apache.qpid.management.wsdm.common.ThreadSessionManager; |
| import org.apache.qpid.management.wsdm.common.UnableToConnectWithBrokerFault; |
| import org.apache.qpid.management.wsdm.muse.engine.WSDMAdapterEnvironment; |
| import org.apache.qpid.management.wsdm.muse.serializer.ByteArraySerializer; |
| import org.apache.qpid.management.wsdm.notifications.LifeCycleEvent; |
| import org.apache.qpid.transport.util.Logger; |
| |
| /** |
| * QMan Adapter capability. |
| * Basically it acts as a lifecycle manager of all ws resource that correspond to entities on JMX side. |
| * |
| * @author Andrea Gazzarini |
| */ |
| @SuppressWarnings("serial") |
| public class QManAdapterCapability extends AbstractCapability |
| { |
| private final static Logger LOGGER = Logger.get(QManAdapterCapability.class); |
| |
| private MBeanServer _mxServer; |
| private WsArtifactsFactory _artifactsFactory; |
| private URI _resourceURI; |
| private NotificationProducer _publisherCapability; |
| private ThreadPoolExecutor _workManager; |
| private Map<String, QName> _lifeCycleTopics = new HashMap<String, QName>(); |
| |
| /** |
| * Runnable wrapper used for sending asynchronous |
| * notifications. |
| * |
| * @author Andrea Gazzarini |
| */ |
| private final class AsynchNotificationTask implements Runnable |
| { |
| private final QName topicName; |
| private final LifeCycleEvent event; |
| |
| AsynchNotificationTask(QName tName, LifeCycleEvent evt) |
| { |
| topicName = tName; |
| event = evt; |
| } |
| |
| public void run() |
| { |
| try |
| { |
| _publisherCapability.publish(topicName,event); |
| } catch (SoapFault exception) |
| { |
| LOGGER.error( |
| exception, |
| Messages.QMAN_100038_UNABLE_TO_SEND_WS_NOTIFICATION); |
| } |
| } |
| }; |
| |
| /** |
| * NotificationFilter for "create" only events. |
| */ |
| private final NotificationFilter _filterForNewInstances = new NotificationFilter(){ |
| |
| /** |
| * Returns true when the notification is related to a creation of a new instance. |
| * |
| * @return true when the notification is related to a creation of a new instance. |
| */ |
| public boolean isNotificationEnabled(Notification notification) |
| { |
| return EntityLifecycleNotification.INSTANCE_ADDED_NOTIFICATION_TYPE.equals(notification.getType()); |
| } |
| |
| }; |
| |
| /** |
| * NotificationFilter for "remove" only events. |
| */ |
| private final NotificationFilter _filterForRemovedInstances = new NotificationFilter(){ |
| |
| /** |
| * Returns true when the notification is related to a deletion of an existing instance. |
| * |
| * @return true when the notification is related to a deletion of an existing instance. |
| */ |
| public boolean isNotificationEnabled(Notification notification) |
| { |
| return EntityLifecycleNotification.INSTANCE_REMOVED_NOTIFICATION_TYPE.equals(notification.getType()); |
| } |
| }; |
| |
| /** |
| * This listener handles "create" mbean events and therefore provides procedure to create and initialize |
| * corresponding ws resources. |
| */ |
| private final NotificationListener _listenerForNewInstances = new NotificationListener() |
| { |
| /** |
| * Handles JMX "create" notification type. |
| * |
| * @param notification the entity lifecycle notification. |
| * @param data user data associated with the incoming notifiication : it is not used at the moment. |
| */ |
| public void handleNotification(Notification notification, Object data) |
| { |
| ObjectName eventSourceName = null; |
| try |
| { |
| EntityLifecycleNotification lifecycleNotification = (EntityLifecycleNotification) notification; |
| eventSourceName = lifecycleNotification.getObjectName(); |
| |
| ThreadSessionManager.getInstance().getSession().setObjectName(eventSourceName); |
| |
| LOGGER.debug(Messages.QMAN_200039_DEBUG_JMX_NOTIFICATION, notification); |
| |
| ResourceManager resourceManager = getResource().getResourceManager(); |
| Resource resource = resourceManager.createResource(Names.QMAN_RESOURCE_NAME); |
| |
| WsArtifacts artifacts = _artifactsFactory.getArtifactsFor(resource,eventSourceName); |
| MBeanCapability capability = _artifactsFactory.createCapability( |
| artifacts.getCapabilityClass(), |
| eventSourceName); |
| |
| ThreadSessionManager.getInstance().getSession().setWsdlDocument(artifacts.getWsdl()); |
| ThreadSessionManager.getInstance().getSession().setResourceMetadataDescriptor(artifacts.getResourceMetadataDescriptor()); |
| |
| resource.setWsdlPortType(Names.QMAN_RESOURCE_PORT_TYPE_NAME); |
| capability.setCapabilityURI(Names.NAMESPACE_URI+"/"+capability.getClass().getSimpleName()); |
| capability.setMessageHandlers(createMessageHandlers(capability)); |
| |
| resource.addCapability(capability); |
| resource.initialize(); |
| resourceManager.addResource(resource.getEndpointReference(), resource); |
| |
| LOGGER.info( |
| Messages.QMAN_000030_RESOURCE_HAS_BEEN_CREATED, |
| eventSourceName); |
| |
| AsynchNotificationTask asynchNotificationTask = new AsynchNotificationTask( |
| getTopicName(lifecycleNotification.getClassKind()), |
| LifeCycleEvent.newCreateEvent( |
| eventSourceName.getKeyProperty(Names.OBJECT_ID), |
| lifecycleNotification.getPackageName(), |
| lifecycleNotification.getClassName())); |
| |
| _workManager.execute(asynchNotificationTask); |
| |
| } catch (ArtifactsNotAvailableException exception) |
| { |
| LOGGER.error( |
| exception, |
| Messages.QMAN_100023_BUILD_WS_ARTIFACTS_FAILURE); |
| } catch (IllegalAccessException exception) |
| { |
| LOGGER.error( |
| exception, |
| Messages.QMAN_100024_CAPABILITY_INSTANTIATION_FAILURE, |
| eventSourceName); |
| } catch (InstantiationException exception) |
| { |
| LOGGER.error( |
| exception, |
| Messages.QMAN_100024_CAPABILITY_INSTANTIATION_FAILURE, |
| eventSourceName); |
| } catch (SoapFault exception) |
| { |
| LOGGER.error( |
| exception,Messages.QMAN_100025_WSRF_FAILURE, |
| eventSourceName); |
| } catch (Exception exception) |
| { |
| LOGGER.error( |
| exception, |
| Messages.QMAN_100025_WSRF_FAILURE, |
| eventSourceName); |
| } |
| } |
| }; |
| |
| /** |
| * This listener handles "remove" mbean events and therefore provides procedure to shutdown and remove |
| * corresponding ws resources. |
| */ |
| private final NotificationListener _listenerForRemovedInstances = new NotificationListener() |
| { |
| /** |
| * Handles JMX "remove" notification type. |
| * |
| * @param notification the entity lifecycle notification. |
| * @param data user data associated with the incoming notifiication : it is not used at the moment. |
| */ |
| public void handleNotification(Notification notification, Object data) |
| { |
| EntityLifecycleNotification lifecycleNotification = (EntityLifecycleNotification) notification; |
| ObjectName eventSourceName = lifecycleNotification.getObjectName(); |
| |
| LOGGER.debug(Messages.QMAN_200042_REMOVING_RESOURCE, eventSourceName); |
| |
| EndpointReference endpointPointReference = new EndpointReference(_resourceURI); |
| endpointPointReference.addParameter( |
| Names.RESOURCE_ID_QNAME, |
| eventSourceName.getKeyProperty(Names.OBJECT_ID)); |
| |
| ResourceManager resourceManager = getResource().getResourceManager(); |
| try |
| { |
| Resource resource = resourceManager.getResource(endpointPointReference); |
| resource.shutdown(); |
| |
| LOGGER.info( |
| Messages.QMAN_000031_RESOURCE_HAS_BEEN_REMOVED, |
| eventSourceName); |
| |
| AsynchNotificationTask asynchNotificationTask = new AsynchNotificationTask( |
| getTopicName(lifecycleNotification.getClassKind()), |
| LifeCycleEvent.newRemoveEvent( |
| eventSourceName.getKeyProperty(Names.OBJECT_ID), |
| lifecycleNotification.getPackageName(), |
| lifecycleNotification.getClassName())); |
| |
| _workManager.execute(asynchNotificationTask); |
| |
| } |
| catch(Exception exception) |
| { |
| LOGGER.error( |
| exception, |
| Messages.QMAN_100027_RESOURCE_SHUTDOWN_FAILURE, |
| eventSourceName); |
| } |
| } |
| }; |
| |
| /** |
| * Initializes this capability. |
| * |
| * @throws SoapFault when the initialization fails.. |
| */ |
| @Override |
| public void initialize() throws SoapFault |
| { |
| super.initialize(); |
| |
| registerByteArraySerializer(); |
| |
| createLifeCycleTopics(); |
| |
| initializeWorkManager(); |
| |
| createQManResourceURI(); |
| |
| _mxServer = ManagementFactory.getPlatformMBeanServer(); |
| _artifactsFactory = new WsArtifactsFactory(getEnvironment(),_mxServer); |
| |
| registerQManLifecycleListeners(); |
| } |
| |
| /** |
| * Connects QMan with a broker with the given connection data. |
| * |
| * @param host the host where the broker is running. |
| * @param port the port number where the broker is running. |
| * @param username username for estabilshing connection. |
| * @param password password for estabilshing connection. |
| * @param virtualHost the virtualHost name. |
| * @param initialPoolCapacity the initial size of broker connection pool. |
| * @param maxPoolCapacity the max allowed size of broker connection pool. |
| * @param maxWaitTimeout the max wait timeout for retrieving connections. |
| * @throws SoapFault when the connection with broker cannot be estabilished. |
| */ |
| @SuppressWarnings("unchecked") |
| public void connect( |
| String host, |
| int port, |
| String username, |
| String password, |
| String virtualHost, |
| int initialPoolCapacity, |
| int maxPoolCapacity, |
| long maxWaitTimeout) throws SoapFault |
| { |
| try |
| { |
| _mxServer.invoke( |
| Names.QMAN_OBJECT_NAME, |
| "addBroker", |
| new Object[]{host,port,username,password,virtualHost,initialPoolCapacity,maxPoolCapacity,maxWaitTimeout}, |
| new String[]{ |
| String.class.getName(), |
| int.class.getName(), |
| String.class.getName(), |
| String.class.getName(), |
| String.class.getName(), |
| int.class.getName(), |
| int.class.getName(), |
| long.class.getName()}); |
| } catch(Exception exception) |
| { |
| LOGGER.error(Messages.QMAN_100017_UNABLE_TO_CONNECT,host,port); |
| throw new UnableToConnectWithBrokerFault( |
| getResource().getEndpointReference(), |
| host, |
| port, |
| username, |
| virtualHost, |
| exception.getMessage()); |
| } |
| } |
| |
| /** |
| * Creates the message handlers for the given capability. |
| * |
| * @param capability the QMan capability. |
| * @return a collection with message handlers for the given capability. |
| */ |
| protected Collection<MessageHandler> createMessageHandlers(MBeanCapability capability) |
| { |
| Collection<MessageHandler> handlers = new ArrayList<MessageHandler>(); |
| |
| for (Method method : capability.getClass().getDeclaredMethods()) |
| { |
| String name = method.getName(); |
| |
| QName requestName = new QName( |
| Names.NAMESPACE_URI, |
| name, |
| Names.PREFIX); |
| |
| QName returnValueName = new QName( |
| Names.NAMESPACE_URI, |
| name+"Response", |
| Names.PREFIX); |
| |
| String actionURI = Names.NAMESPACE_URI+"/"+name; |
| |
| MessageHandler handler = new QManMessageHandler( |
| actionURI, |
| requestName, |
| returnValueName); |
| |
| handler.setMethod(method); |
| handlers.add(handler); |
| } |
| return handlers; |
| } |
| |
| /** |
| * Returns the publisher capability associated with the owner resource. |
| * |
| * @return the publisher capability associated with the owner resource. |
| */ |
| NotificationProducer getPublisherCapability() |
| { |
| return (NotificationProducer) getResource().getCapability(WsnConstants.PRODUCER_URI); |
| } |
| |
| /** |
| * Creates events & objects lifecycle topic that will be used to publish lifecycle event |
| * messages.. |
| */ |
| void createLifeCycleTopics() |
| { |
| try |
| { |
| _publisherCapability = getPublisherCapability(); |
| |
| _publisherCapability.addTopic(Names.EVENTS_LIFECYLE_TOPIC_NAME); |
| _lifeCycleTopics.put(Names.EVENT,Names.EVENTS_LIFECYLE_TOPIC_NAME); |
| |
| LOGGER.info( |
| Messages.QMAN_000032_EVENTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, |
| Names.OBJECTS_LIFECYLE_TOPIC_NAME); |
| |
| _publisherCapability.addTopic(Names.OBJECTS_LIFECYLE_TOPIC_NAME); |
| _lifeCycleTopics.put(Names.CLASS,Names.OBJECTS_LIFECYLE_TOPIC_NAME); |
| |
| LOGGER.info( |
| Messages.QMAN_000033_OBJECTS_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, |
| Names.OBJECTS_LIFECYLE_TOPIC_NAME); |
| |
| _publisherCapability.addTopic(Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME); |
| LOGGER.info( |
| Messages.QMAN_000034_UNCLASSIFIED_LIFECYCLE_TOPIC_HAS_BEEN_CREATED, |
| Names.OBJECTS_LIFECYLE_TOPIC_NAME); |
| } catch(Exception exception) |
| { |
| LOGGER.error(exception, Messages.QMAN_100036_TOPIC_DECLARATION_FAILURE); |
| } |
| } |
| |
| /** |
| * Starting from an object type (i.e. event or class) returns the name of the |
| * corresponding topic where the lifecycle message must be published. |
| * Note that if the given object type is unknown then the "Unclassified Object Types" topic |
| * will be returned (and therefore the message will be published there). |
| * |
| * @param objectType the type of the object. |
| * @return the name of the topic associated with the given object type. |
| */ |
| QName getTopicName(String objectType) |
| { |
| QName topicName = _lifeCycleTopics.get(objectType); |
| return (topicName != null) |
| ? topicName |
| : Names.UNKNOWN_OBJECT_TYPE_LIFECYLE_TOPIC_NAME; |
| } |
| |
| /** |
| * Workaround : it seems that is not possibile to declare a serializer |
| * for a byte array using muse descriptor... |
| * What is the stringified name of the class? |
| * byte[].getClass().getName() is [B but is not working (ClassNotFound). |
| * So, at the end, this is hard-coded here! |
| */ |
| private void registerByteArraySerializer() |
| { |
| SerializerRegistry.getInstance().registerSerializer( |
| byte[].class, |
| new ByteArraySerializer()); |
| } |
| |
| /** |
| * Creates the URI that will be later used to identify a QMan WS-Resource. |
| * Note that the resources that will be created are identified also with their resource id. |
| * Briefly we could say that this is the soap:address of the WS-Resource definition. |
| * |
| * @throws SoapFault when the URI cannot be built (probably it is malformed). |
| */ |
| private void createQManResourceURI() throws SoapFault |
| { |
| WSDMAdapterEnvironment environment = (WSDMAdapterEnvironment) getEnvironment(); |
| String resourceURI = environment.getDefaultURIPrefix()+Names.QMAN_RESOURCE_NAME; |
| try |
| { |
| _resourceURI = URI.create(resourceURI); |
| |
| } catch(IllegalArgumentException exception) |
| { |
| LOGGER.info( |
| exception, |
| Messages.QMAN_100029_MALFORMED_RESOURCE_URI_FAILURE, |
| resourceURI); |
| throw new SoapFault(exception); |
| } |
| } |
| |
| /** |
| * Initializes the work manager used for asynchronous notifications. |
| */ |
| private void initializeWorkManager() |
| { |
| Configuration configuration = Configuration.getInstance(); |
| _workManager = new ThreadPoolExecutor( |
| configuration.getWorkerManagerPoolSize(), |
| configuration.getWorkerManagerMaxPoolSize(), |
| configuration.getWorkerManagerKeepAliveTime(), |
| TimeUnit.MILLISECONDS, |
| new ArrayBlockingQueue<Runnable>(30)); |
| } |
| |
| /** |
| * This adapter capability needs to be an event listener of QMan JMX core |
| * in order to detect relevant lifecycle events and therefore create WS artifacts & notification(s). |
| * |
| * @throws SoapFault when it's not possible to register event listener : is QMan running? |
| */ |
| @SuppressWarnings("serial") |
| private void registerQManLifecycleListeners() throws SoapFault |
| { |
| try |
| { |
| _mxServer.addNotificationListener( |
| Names.QMAN_OBJECT_NAME, |
| _listenerForNewInstances, |
| _filterForNewInstances, |
| null); |
| |
| _mxServer.addNotificationListener( |
| Names.QMAN_OBJECT_NAME, |
| _listenerForRemovedInstances, |
| _filterForRemovedInstances, |
| null); |
| |
| try |
| { |
| _mxServer.addNotificationListener( |
| Names.QPID_EMULATOR_OBJECT_NAME, |
| _listenerForNewInstances, |
| _filterForNewInstances, null); |
| |
| _mxServer.addNotificationListener( |
| Names.QPID_EMULATOR_OBJECT_NAME, |
| _listenerForRemovedInstances, |
| _filterForRemovedInstances, null); |
| |
| } catch (Exception exception) |
| { |
| LOGGER.info(Messages.QMAN_000028_TEST_MODULE_NOT_FOUND); |
| } |
| } catch(InstanceNotFoundException exception) |
| { |
| throw new SoapFault(exception); |
| } |
| } |
| } |