blob: f3a7ee3fc47c3add7cae9ff88e5d3f60bec370c5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License
AMQP management client for Qpid dispatch.
import qpid_dispatch_site
import proton
from proton import Url
from .error import * # import all error symbols for convenience to users.
from .entity import EntityBase, clean_dict
from proton.utils import SyncRequestResponse, BlockingConnection
class Entity(EntityBase):
Proxy for an AMQP manageable entity.
Modifying local attributes dict will not change the remote entity. Call
update() to send the local attributes to the remote entity.
The standard AMQP requests read, update and delete are defined
here. create() is defined on L{Node}.
Attribute access:
- via index operator: entity['foo']
- as python attributes: (only if attribute name is a legal python identitfier)
@ivar attributes: Map of attribute values for this entity.
def __init__(self, node, attributes=None, **kwattrs):
super(Entity, self).__init__(attributes, **kwattrs)
self.__dict__['_node'] = node # Avoid getattr recursion
def call(self, operation, expect=OK, **arguments):
"""Call an arbitrary management method on this entity"""
request = self._node.request(
operation=operation, type=self.type, identity=self.identity, **arguments)
return, expect=expect).body
def read(self):
"""Read the remote entity attributes into the local attributes."""
self.attributes ='READ', expect=OK)
def update(self):
"""Update the remote entity attributes from the local attributes."""
self.attributes ='UPDATE', expect=OK, body=self.attributes)
def delete(self):
"""Delete the remote entity"""'DELETE', expect=NO_CONTENT)
class Node(object):
"""Client proxy for an AMQP management node"""
def connection(url=None, router=None, timeout=10, ssl_domain=None, sasl=None):
"""Return a BlockingConnection suitable for connecting to a management node
@param url: URL of the management node.
@param router: If address does not contain a path, use the management node for this router ID.
If not specified and address does not contain a path, use the default management node.
url = Url(url) # Convert string to Url class.
if url.path is None:
if router:
url.path = u'_topo/0/%s/$management' % router
url.path = u'$management'
# if sasl_mechanism is unicode, convert it to python string
return BlockingConnection(url,
allowed_mechs=str(sasl.mechs) if sasl else None,
user=str(sasl.user) if sasl else None,
password=str(sasl.password) if sasl else None)
def connect(url=None, router=None, timeout=10, ssl_domain=None, sasl=None):
"""Return a Node connected with the given parameters, see L{connection}"""
return Node(Node.connection(url, router, timeout, ssl_domain, sasl))
def __init__(self, connection, locales=None):
Create a management node proxy using the given connection.
@param locales: Default list of locales for management operations.
@param connection: a L{BlockingConnection} to the management agent.
""" = self.identity = u'self'
self.type = u'' # AMQP management node type
self.locales = locales
self.locales = locales
self.url = connection.url
self.client = SyncRequestResponse(connection, self.url.path)
self.reply_to = self.client.reply_to
def close(self):
"""Shut down the node"""
if self.client:
self.client = None
def __repr__(self):
return "%s(%s)"%(self.__class__.__name__, self.url)
def check_response(response, expect=OK):
Check a management response message for errors and correlation ID.
code ='statusCode')
if code != expect:
if 200 <= code <= 299:
raise ValueError("Response was %s(%s) but expected %s(%s): %s" % (
code, STATUS_TEXT[code], expect, STATUS_TEXT[expect],'statusDescription')))
raise ManagementError.create(code,'statusDescription'))
def request(self, body=None, **properties):
Make a L{proton.Message} containining a management request.
@param body: The request body, a dict or list.
@param properties: Keyword arguments for application-properties of the request.
@return: L{proton.Message} containining the management request.
if self.locales: properties.setdefault(u'locales', self.locales)
request = proton.Message() = clean_dict(properties)
request.body = body or {}
return request
def node_request(self, body=None, **properties):
"""Construct a request for the managment node itself"""
return self.request(body,, type=self.type, **properties)
def call(self, request, expect=OK):
Send a management request message, wait for a response.
@return: Response message.
response =
self.check_response(response, expect=expect)
return response
class QueryResponse(object):
Result returned by L{query}.
@ivar attribute_names: List of attribute names for the results.
@ivar results: list of lists of attribute values in same order as attribute_names
def __init__(self, node, attribute_names, results):
@param response: the respose message to a query.
self.node = node
self.attribute_names = attribute_names
self.results = results
def iter_dicts(self, clean=False):
Return an iterator that yields a dictionary for each result.
@param clean: if True remove any None values from returned dictionaries.
for r in self.results:
if clean: yield clean_dict(zip(self.attribute_names, r))
else: yield dict(zip(self.attribute_names, r))
def iter_entities(self, clean=False):
Return an iterator that yields an L{Entity} for each result.
@param clean: if True remove any None values from returned dictionaries.
for d in self.iter_dicts(clean=clean): yield Entity(self.node, d)
def get_dicts(self, clean=False):
"""Results as list of dicts."""
return [d for d in self.iter_dicts(clean=clean)]
def get_entities(self, clean=False):
"""Results as list of entities."""
return [d for d in self.iter_entities(clean=clean)]
def __repr__(self):
return "QueryResponse(attribute_names=%r, results=%r"%(self.attribute_names, self.results)
def query(self, type=None, attribute_names=None, offset=None, count=None):
Send an AMQP management query message and return the response.
At least one of type, attribute_names must be specified.
@keyword type: The type of entity to query.
@keyword attribute_names: A list of attribute names to query.
@keyword offset: An integer offset into the list of results to return.
@keyword count: A count of the maximum number of results to return.
@return: A L{QueryResponse}
request = self.node_request(
{u'attributeNames': attribute_names or []},
operation=u'QUERY', entityType=type, offset=offset, count=count)
response =
return Node.QueryResponse(self, response.body[u'attributeNames'], response.body[u'results'])
def create(self, attributes=None, type=None, name=None):
Create an entity.
type and name can be specified in the attributes.
@param attributes: Attributes for the new entity.
@param type: Type of entity to create.
@param name: Name for the new entity.
@return: Entity proxy for the new entity.
attributes = attributes or {}
type = type or attributes.get(u'type')
name = name or attributes.get(u'name')
request = self.request(operation=u'CREATE', type=type, name=name, body=attributes)
return Entity(self,, expect=CREATED).body)
def read(self, type=None, name=None, identity=None):
Read an AMQP entity.
If both name and identity are specified, only identity is used.
@param type: Entity type.
@param name: Entity name.
@param identity: Entity identity.
@return: An L{Entity}
if name and identity: name = None # Only specify one
request = self.request(operation=u'READ', type=type, name=name, identity=identity)
return Entity(self,
def update(self, attributes, type=None, name=None, identity=None):
Update an entity with attributes.
type, name and identity can be specified in the attributes.
If both name and identity are specified, only identity is used.
@param attributes: Attributes for the new entity.
@param type: Entity type.
@param name: Entity name.
@param identity: Entity identity.
@return: L{Entity} for the updated entity.
attributes = attributes or {}
type = type or attributes.get(u'type')
name = name or attributes.get(u'name')
identity = identity or attributes.get(u'identity')
if name and identity: name = None # Only send one
request = self.request(operation=U'UPDATE', type=type, name=name,
identity=identity, body=attributes)
return Entity(self,
def delete(self, type=None, name=None, identity=None):
Delete the remote entity.
If both name and identity are specified, only identity is used.
@param type: Entity type.
@param name: Entity name.
@param identity: Entity identity.
if name and identity: name = None # Only specify one
request = self.request(operation=U'DELETE', type=type, name=name,
identity=identity), expect=NO_CONTENT)
def get_types(self, type=None):
return"GET-TYPES", entityType=type)).body
def get_annotations(self, type=None):
return"GET-ANNOTATIONS", entityType=type)).body
def get_attributes(self, type=None):
return"GET-ATTRIBUTES", entityType=type)).body
def get_operations(self, type=None):
return"GET-OPERATIONS", entityType=type)).body
def get_mgmt_nodes(self, type=None):
return"GET-MGMT-NODES", entityType=type)).body
def get_log(self, limit=None, type=None):
return"GET-LOG", entityType=type, limit=limit)).body