blob: a6f5ab90cc5812a6e5788c5762bb6b24db5619a6 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.management.domain.services;
import java.util.UUID;
import org.apache.qpid.management.Messages;
import org.apache.qpid.management.Names;
import org.apache.qpid.management.configuration.BrokerConnectionData;
import org.apache.qpid.management.configuration.Configuration;
import org.apache.qpid.management.domain.model.DomainModel;
import org.apache.qpid.transport.util.Logger;
/**
* This is the Object representation of a management client.
* According to specification : "A software component that is separate from the messaging broker, connected to the
* management broker via an AMQP connection, which allows any software component to be managed remotely by QPID."
*
* @author Andrea Gazzarini
*/
public final class ManagementClient
{
private final static Logger LOGGER = Logger.get(ManagementClient.class);
private final String _managementQueueName;
private final String _methodReplyQueueName;
private DomainModel _domainModel;
private QpidService _service;
private final BrokerConnectionData _connectionData;
/**
* Builds a new <code>ManagementClient</code> with the given identifier and connection data.
*
* @param brokerId the broker identifier.
* @param connectionData the broker connection data (host, port, etc...)
*/
ManagementClient(UUID brokerId,BrokerConnectionData connectionData)
{
_connectionData = connectionData;
_service = new QpidService(brokerId);
_domainModel = new DomainModel(brokerId);
_managementQueueName = Configuration.getInstance().getManagementQueueName();
_methodReplyQueueName = Configuration.getInstance().getMethodReplyQueueName();
}
@Override
public String toString()
{
return _connectionData.toString();
}
/**
* Returns the connection data associated with this management client.
*
* @return the connection data associated with this management client.
*/
public BrokerConnectionData getBrokerConnectionData()
{
return _connectionData;
}
/**
* Establishing initial communication Between Client and Broker.
* According to specification :
* "Communication is established between the management client and management agent using normal AMQP procedures.
* The client creates a connection to the broker and then establishes a session with its corresponding channel.
* Two private queues are then declared.
* A management queue is declared and bound to the qpid.management exchange with "mgmt.#" as routing key; in that
* way all management-related messages sent to the exchange will be received by this client.
* When a client successfully binds to the qpid.management exchange, the management agent schedules a schema
* broadcast to be sent to the exchange.
* The agent will publish, via the exchange, a description of the schema for all manageable objects in its control. That
* schema is therefore received by this service and it will be part of service's domain model."
*
* @throws StartupFailureException when this management client cannot perform startup operations due to an error.
*/
void estabilishFirstConnectionWithBroker() throws StartupFailureException{
try {
connectWithBroker();
createAndBindMethodReplyQueue();
createAndBindManagementQueue();
registerConsumerOnManagementQueue();
registerConsumerOnMethodReplyQueue();
synchronize();
} catch(Exception exception)
{
try {
_service.close();
} catch(Exception ignore)
{
}
throw new StartupFailureException(exception);
}
}
/**
* Shutdown procedure for this management client.
*/
void shutdown ()
{
LOGGER.info(Messages.QMAN_000011_SHUTDOWN_INITIATED,_domainModel.getBrokerId());
removeMethodReplyConsumer();
destroyAndUnbingMethodReplyQueue();
removeManagementConsumer();
destroyAndUnbingManagementQueue();
_domainModel.releaseResources();
_service.close();
LOGGER.info(Messages.QMAN_000012_MANAGEMENT_CLIENT_SHUT_DOWN,_domainModel.getBrokerId());
}
/**
* Registers a consumer (that is, a listener) on the method-reply queue.
*/
private void registerConsumerOnMethodReplyQueue ()
{
BrokerMessageListener methodReplyChannelListener = new BrokerMessageListener(_domainModel);
methodReplyChannelListener.setHandlers(Configuration.getInstance().getMethodReplyQueueHandlers());
_service.createSubscription(_methodReplyQueueName, _methodReplyQueueName, methodReplyChannelListener);
LOGGER.info(Messages.QMAN_000013_METHOD_REPLY_CONSUMER_INSTALLED, _domainModel.getBrokerId());
}
/**
* Registers a consumer (listener) on the management queue.
*/
private void registerConsumerOnManagementQueue ()
{
BrokerMessageListener managementChannelListener = new BrokerMessageListener(_domainModel);
managementChannelListener.setHandlers(Configuration.getInstance().getManagementQueueHandlers());
_service.createSubscription(_managementQueueName, _managementQueueName, managementChannelListener);
LOGGER.info(Messages.QMAN_000014_MANAGEMENT_CONSUMER_INSTALLED, _domainModel.getBrokerId());
}
/**
* Declares a management queue and bound it to the "qpid.management" exchange with "mgmt.#" as routing key;
*/
private void createAndBindManagementQueue ()
{
_service.declareQueue(_managementQueueName);
_service.declareBinding(
_managementQueueName,
Names.MANAGEMENT_EXCHANGE,
Names.MANAGEMENT_ROUTING_KEY);
LOGGER.info(Messages.QMAN_000015_MANAGEMENT_QUEUE_DECLARED,_managementQueueName,_domainModel.getBrokerId());
}
/**
* Declares a private queue for receiving method replies (after method invocations).
* This queue is bound to the amq.direct exchange using a routing key equal to the name of the queue.
*/
private void createAndBindMethodReplyQueue ()
{
_service.declareQueue(_methodReplyQueueName);
_service.declareBinding(_methodReplyQueueName, Names.AMQ_DIRECT_QUEUE, _methodReplyQueueName);
LOGGER.info(Messages.QMAN_000016_METHOD_REPLY_QUEUE_DECLARED,_methodReplyQueueName, _domainModel.getBrokerId());
}
/**
* Removes the method-reply queue consumer.
*/
private void removeMethodReplyConsumer()
{
_service.removeSubscription(_methodReplyQueueName);
LOGGER.info(Messages.QMAN_000017_CONSUMER_HAS_BEEN_REMOVED,_methodReplyQueueName,_domainModel.getBrokerId());
}
/**
* Unbind the method reply queue and after that destroy it from remote broker.
*/
private void destroyAndUnbingMethodReplyQueue()
{
_service.declareUnbinding(_methodReplyQueueName, Names.AMQ_DIRECT_QUEUE, _methodReplyQueueName);
_service.deleteQueue(_methodReplyQueueName);
LOGGER.info(Messages.QMAN_000018_QUEUE_UNDECLARED,_methodReplyQueueName,_domainModel.getBrokerId());
}
/**
* Removes the management queue consumer.
*/
private void removeManagementConsumer()
{
_service.removeSubscription(_managementQueueName);
LOGGER.info(Messages.QMAN_000017_CONSUMER_HAS_BEEN_REMOVED,_managementQueueName,_domainModel.getBrokerId());
}
/**
* Unbind the management queue and after that destroy it from remote broker.
*/
private void destroyAndUnbingManagementQueue()
{
_service.declareUnbinding(_managementQueueName, Names.MANAGEMENT_EXCHANGE, Names.MANAGEMENT_ROUTING_KEY);
_service.deleteQueue(_managementQueueName);
LOGGER.info(Messages.QMAN_000018_QUEUE_UNDECLARED, _managementQueueName,_domainModel.getBrokerId());
}
/**
* Connects this client with the broker.
*
* @throws QpidException when it's not possibile to connect with the broker.
*/
private void connectWithBroker() throws Exception
{
_service.connect();
}
/**
* All the Management client commands are asynchronous.
* Synchronous behavior is achieved through invoking the sync method.
*/
private void synchronize()
{
_service.sync();
}
}