blob: 9227835b3fdd97669494260aec591e61f618e437 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
import os
import platform
import time
import datetime
import Queue
from logging import getLogger
from threading import Thread, Event
from threading import RLock
from threading import currentThread
from threading import Condition
from qpid.messaging import Connection, Message, Empty, SendError
from common import (QMF_APP_ID, OpCode, QmfQuery, Notifier, ContentType,
QmfData, QmfAddress, SchemaClass, SchemaClassId,
SchemaEventClass, SchemaObjectClass, WorkItem,
SchemaMethod, QmfEvent, timedelta_to_secs)
# global flag that indicates which thread (if any) is
# running the console notifier callback
_callback_thread=None
log = getLogger("qmf")
trace = getLogger("qmf.console")
##==============================================================================
## Console Transaction Management
##
## At any given time, a console application may have multiple outstanding
## message transactions with agents. The following objects allow the console
## to track these outstanding transactions.
##==============================================================================
class _Mailbox(object):
"""
Virtual base class for all Mailbox-like objects.
"""
def __init__(self, console):
self.console = console
self.cid = 0
self.console._add_mailbox(self)
def get_address(self):
return self.cid
def deliver(self, data):
"""
Invoked by Console Management thread when a message arrives for
this mailbox.
"""
raise Exception("_Mailbox deliver() method must be provided")
def destroy(self):
"""
Release the mailbox. Once called, the mailbox should no longer be
referenced.
"""
self.console._remove_mailbox(self.cid)
class _SyncMailbox(_Mailbox):
"""
A simple mailbox that allows a consumer to wait for delivery of data.
"""
def __init__(self, console):
"""
Invoked by application thread.
"""
super(_SyncMailbox, self).__init__(console)
self._cv = Condition()
self._data = []
self._waiting = False
def deliver(self, data):
"""
Drop data into the mailbox, waking any waiters if necessary.
Invoked by Console Management thread only.
"""
self._cv.acquire()
try:
self._data.append(data)
# if was empty, notify waiters
if len(self._data) == 1:
self._cv.notify()
finally:
self._cv.release()
def fetch(self, timeout=None):
"""
Get one data item from a mailbox, with timeout.
Invoked by application thread.
"""
self._cv.acquire()
try:
if len(self._data) == 0:
self._cv.wait(timeout)
if len(self._data):
return self._data.pop(0)
return None
finally:
self._cv.release()
class _AsyncMailbox(_Mailbox):
"""
A Mailbox for asynchronous delivery, with a timeout value.
"""
def __init__(self, console,
_timeout=None):
"""
Invoked by application thread.
"""
super(_AsyncMailbox, self).__init__(console)
self.console = console
if _timeout is None:
_timeout = console._reply_timeout
self.expiration_date = (datetime.datetime.utcnow() +
datetime.timedelta(seconds=_timeout))
console._lock.acquire()
try:
console._async_mboxes[self.cid] = self
console._next_mbox_expire = None
finally:
console._lock.release()
# now that an async mbox has been created, wake the
# console mgmt thread so it will know about the mbox expiration
# date (and adjust its idle sleep period correctly)
console._wake_thread()
def reset_timeout(self, _timeout=None):
""" Reset the expiration date for this mailbox.
"""
if _timeout is None:
_timeout = self.console._reply_timeout
self.console._lock.acquire()
try:
self.expiration_date = (datetime.datetime.utcnow() +
datetime.timedelta(seconds=_timeout))
self.console._next_mbox_expire = None
finally:
self.console._lock.release()
# wake the console mgmt thread so it will learn about the mbox
# expiration date (and adjust its idle sleep period correctly)
self.console._wake_thread()
def deliver(self, msg):
"""
"""
raise Exception("deliver() method must be provided")
def expire(self):
raise Exception("expire() method must be provided")
def destroy(self):
self.console._lock.acquire()
try:
if self.cid in self.console._async_mboxes:
del self.console._async_mboxes[self.cid]
finally:
self.console._lock.release()
super(_AsyncMailbox, self).destroy()
class _QueryMailbox(_AsyncMailbox):
"""
A mailbox used for asynchronous query requests.
"""
def __init__(self, console,
agent_name,
context,
target,
_timeout=None):
"""
Invoked by application thread.
"""
super(_QueryMailbox, self).__init__(console,
_timeout)
self.agent_name = agent_name
self.target = target
self.context = context
self.result = []
def deliver(self, reply):
"""
Process query response messages delivered to this mailbox.
Invoked by Console Management thread only.
"""
trace.debug("Delivering to query mailbox (agent=%s)." % self.agent_name)
objects = reply.content
if isinstance(objects, type([])):
# convert from map to native types if needed
if self.target == QmfQuery.TARGET_SCHEMA_ID:
for sid_map in objects:
self.result.append(SchemaClassId.from_map(sid_map))
elif self.target == QmfQuery.TARGET_SCHEMA:
for schema_map in objects:
# extract schema id, convert based on schema type
sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
if sid_map:
sid = SchemaClassId.from_map(sid_map)
if sid:
if sid.get_type() == SchemaClassId.TYPE_DATA:
schema = SchemaObjectClass.from_map(schema_map)
else:
schema = SchemaEventClass.from_map(schema_map)
self.console._add_schema(schema) # add to schema cache
self.result.append(schema)
elif self.target == QmfQuery.TARGET_OBJECT:
for obj_map in objects:
# @todo: need the agent name - ideally from the
# reply message iself.
agent = self.console.get_agent(self.agent_name)
if agent:
obj = QmfConsoleData(map_=obj_map, agent=agent)
# start fetch of schema if not known
sid = obj.get_schema_class_id()
if sid:
self.console._prefetch_schema(sid, agent)
self.result.append(obj)
else:
# no conversion needed.
self.result += objects
if not "partial" in reply.properties:
# log.error("QUERY COMPLETE for %s" % str(self.context))
wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
self.console._work_q.put(wi)
self.console._work_q_put = True
self.destroy()
def expire(self):
trace.debug("Expiring query mailbox (agent=%s)." % self.agent_name)
# send along whatever (possibly none) has been received so far
wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
self.console._work_q.put(wi)
self.console._work_q_put = True
self.destroy()
class _SchemaPrefetchMailbox(_AsyncMailbox):
"""
Handles responses to schema fetches made by the console.
"""
def __init__(self, console,
schema_id,
_timeout=None):
"""
Invoked by application thread.
"""
super(_SchemaPrefetchMailbox, self).__init__(console,
_timeout)
self.schema_id = schema_id
def deliver(self, reply):
"""
Process schema response messages.
"""
trace.debug("Delivering schema mailbox (id=%s)." % self.schema_id)
done = False
schemas = reply.content
if schemas and isinstance(schemas, type([])):
for schema_map in schemas:
# extract schema id, convert based on schema type
sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
if sid_map:
sid = SchemaClassId.from_map(sid_map)
if sid:
if sid.get_type() == SchemaClassId.TYPE_DATA:
schema = SchemaObjectClass.from_map(schema_map)
else:
schema = SchemaEventClass.from_map(schema_map)
self.console._add_schema(schema) # add to schema cache
self.destroy()
def expire(self):
trace.debug("Expiring schema mailbox (id=%s)." % self.schema_id)
self.destroy()
class _MethodMailbox(_AsyncMailbox):
"""
A mailbox used for asynchronous method requests.
"""
def __init__(self, console,
context,
_timeout=None):
"""
Invoked by application thread.
"""
super(_MethodMailbox, self).__init__(console,
_timeout)
self.context = context
def deliver(self, reply):
"""
Process method response messages delivered to this mailbox.
Invoked by Console Management thread only.
"""
trace.debug("Delivering to method mailbox.")
_map = reply.content
if not _map or not isinstance(_map, type({})):
log.error("Invalid method call reply message")
result = None
else:
error=_map.get(SchemaMethod.KEY_ERROR)
if error:
error = QmfData.from_map(error)
result = MethodResult(_error=error)
else:
result = MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS))
# create workitem
wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, result)
self.console._work_q.put(wi)
self.console._work_q_put = True
self.destroy()
def expire(self):
"""
The mailbox expired without receiving a reply.
Invoked by the Console Management thread only.
"""
trace.debug("Expiring method mailbox.")
# send along an empty response
wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None)
self.console._work_q.put(wi)
self.console._work_q_put = True
self.destroy()
class _SubscriptionMailbox(_AsyncMailbox):
"""
A Mailbox for a single subscription. Allows only sychronous "subscribe"
and "refresh" requests.
"""
def __init__(self, console, context, agent, duration, interval):
"""
Invoked by application thread.
"""
super(_SubscriptionMailbox, self).__init__(console, duration)
self.cv = Condition()
self.data = []
self.result = []
self.context = context
self.duration = duration
self.interval = interval
self.agent_name = agent.get_name()
self.agent_subscription_id = None # from agent
def subscribe(self, query):
agent = self.console.get_agent(self.agent_name)
if not agent:
log.warning("subscribed failed - unknown agent '%s'" %
self.agent_name)
return False
try:
trace.debug("Sending Subscribe to Agent (%s)" % self.agent_name)
agent._send_subscribe_req(query, self.get_address(), self.interval,
self.duration)
except SendError, e:
log.error(str(e))
return False
return True
def resubscribe(self):
agent = self.console.get_agent(self.agent_name)
if not agent:
log.warning("resubscribed failed - unknown agent '%s'",
self.agent_name)
return False
try:
trace.debug("Sending resubscribe to Agent %s", self.agent_name)
agent._send_resubscribe_req(self.get_address(),
self.agent_subscription_id)
except SendError, e:
log.error(str(e))
return False
return True
def deliver(self, msg):
"""
"""
opcode = msg.properties.get("qmf.opcode")
if (opcode == OpCode.subscribe_rsp):
error = msg.content.get("_error")
if error:
try:
e_map = QmfData.from_map(error)
except TypeError:
log.warning("Invalid QmfData map received: '%s'"
% str(error))
e_map = QmfData.create({"error":"Unknown error"})
sp = SubscribeParams(None, None, None, e_map)
else:
self.agent_subscription_id = msg.content.get("_subscription_id")
self.duration = msg.content.get("_duration", self.duration)
self.interval = msg.content.get("_interval", self.interval)
self.reset_timeout(self.duration)
sp = SubscribeParams(self.get_address(),
self.interval,
self.duration,
None)
self.cv.acquire()
try:
self.data.append(sp)
# if was empty, notify waiters
if len(self.data) == 1:
self.cv.notify()
finally:
self.cv.release()
return
# else: data indication
agent_name = msg.properties.get("qmf.agent")
if not agent_name:
log.warning("Ignoring data_ind - no agent name given: %s" %
msg)
return
agent = self.console.get_agent(agent_name)
if not agent:
log.warning("Ignoring data_ind - unknown agent '%s'" %
agent_name)
return
objects = msg.content
for obj_map in objects:
obj = QmfConsoleData(map_=obj_map, agent=agent)
# start fetch of schema if not known
sid = obj.get_schema_class_id()
if sid:
self.console._prefetch_schema(sid, agent)
self.result.append(obj)
if not "partial" in msg.properties:
wi = WorkItem(WorkItem.SUBSCRIBE_INDICATION, self.context, self.result)
self.result = []
self.console._work_q.put(wi)
self.console._work_q_put = True
def fetch(self, timeout=None):
"""
Get one data item from a mailbox, with timeout.
Invoked by application thread.
"""
self.cv.acquire()
try:
if len(self.data) == 0:
self.cv.wait(timeout)
if len(self.data):
return self.data.pop(0)
return None
finally:
self.cv.release()
def expire(self):
""" The subscription expired.
"""
self.destroy()
class _AsyncSubscriptionMailbox(_SubscriptionMailbox):
"""
A Mailbox for a single subscription. Allows only asychronous "subscribe"
and "refresh" requests.
"""
def __init__(self, console, context, agent, duration, interval):
"""
Invoked by application thread.
"""
super(_AsyncSubscriptionMailbox, self).__init__(console, context,
agent, duration,
interval)
self.subscribe_pending = False
def subscribe(self, query, reply_timeout):
if super(_AsyncSubscriptionMailbox, self).subscribe(query):
self.subscribe_pending = True
self.reset_timeout(reply_timeout)
return True
return False
def deliver(self, msg):
"""
"""
super(_AsyncSubscriptionMailbox, self).deliver(msg)
sp = self.fetch(0)
if sp and self.subscribe_pending:
wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, self.context, sp)
self.console._work_q.put(wi)
self.console._work_q_put = True
self.subscribe_pending = False
if not sp.succeeded():
self.destroy()
def expire(self):
""" Either the subscription expired, or a request timedout.
"""
if self.subscribe_pending:
wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, self.context, None)
self.console._work_q.put(wi)
self.console._work_q_put = True
self.destroy()
##==============================================================================
## DATA MODEL
##==============================================================================
class QmfConsoleData(QmfData):
"""
Console's representation of an managed QmfData instance.
"""
def __init__(self, map_, agent):
super(QmfConsoleData, self).__init__(_map=map_,
_const=True)
self._agent = agent
def get_timestamps(self):
"""
Returns a list of timestamps describing the lifecycle of
the object. All timestamps are represented by the AMQP
timestamp type. [0] = time of last update from Agent,
[1] = creation timestamp
[2] = deletion timestamp, or zero if not
deleted.
"""
return [self._utime, self._ctime, self._dtime]
def get_create_time(self):
"""
returns the creation timestamp
"""
return self._ctime
def get_update_time(self):
"""
returns the update timestamp
"""
return self._utime
def get_delete_time(self):
"""
returns the deletion timestamp, or zero if not yet deleted.
"""
return self._dtime
def is_deleted(self):
"""
True if deletion timestamp not zero.
"""
return self._dtime != long(0)
def refresh(self, _reply_handle=None, _timeout=None):
"""
request that the Agent update the value of this object's
contents.
"""
if _reply_handle is not None:
log.error(" ASYNC REFRESH TBD!!!")
return None
assert self._agent
assert self._agent._console
if _timeout is None:
_timeout = self._agent._console._reply_timeout
# create query to agent using this objects ID
query = QmfQuery.create_id_object(self.get_object_id(),
self.get_schema_class_id())
obj_list = self._agent._console.do_query(self._agent, query,
_timeout=_timeout)
if obj_list is None or len(obj_list) != 1:
return None
self._update(obj_list[0])
return self
def invoke_method(self, name, _in_args={}, _reply_handle=None,
_timeout=None):
"""
Invoke the named method on this object.
"""
assert self._agent
assert self._agent._console
oid = self.get_object_id()
if oid is None:
raise ValueError("Cannot invoke methods on unmanaged objects.")
if _timeout is None:
_timeout = self._agent._console._reply_timeout
if _reply_handle is not None:
mbox = _MethodMailbox(self._agent._console,
_reply_handle)
else:
mbox = _SyncMailbox(self._agent._console)
cid = mbox.get_address()
_map = {self.KEY_OBJECT_ID:str(oid),
SchemaMethod.KEY_NAME:name}
sid = self.get_schema_class_id()
if sid:
_map[self.KEY_SCHEMA_ID] = sid.map_encode()
if _in_args:
_map[SchemaMethod.KEY_ARGUMENTS] = _in_args
trace.debug("Sending method req to Agent (%s)" % time.time())
try:
self._agent._send_method_req(_map, cid)
except SendError, e:
log.error(str(e))
mbox.destroy()
return None
if _reply_handle is not None:
return True
trace.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
mbox.destroy()
if not replyMsg:
trace.debug("Agent method req wait timed-out.")
return None
_map = replyMsg.content
if not _map or not isinstance(_map, type({})):
log.error("Invalid method call reply message")
return None
error=_map.get(SchemaMethod.KEY_ERROR)
if error:
return MethodResult(_error=QmfData.from_map(error))
else:
return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS))
def _update(self, newer):
super(QmfConsoleData,self).__init__(_values=newer._values, _subtypes=newer._subtypes,
_tag=newer._tag, _object_id=newer._object_id,
_ctime=newer._ctime, _utime=newer._utime,
_dtime=newer._dtime,
_schema_id=newer._schema_id, _const=True)
class QmfLocalData(QmfData):
"""
Console's representation of an unmanaged QmfData instance. There
is no remote agent associated with this instance. The Console has
full control over this instance.
"""
def __init__(self, values, _subtypes={}, _tag=None, _object_id=None,
_schema=None):
# timestamp in millisec since epoch UTC
ctime = long(time.time() * 1000)
super(QmfLocalData, self).__init__(_values=values,
_subtypes=_subtypes, _tag=_tag,
_object_id=_object_id,
_schema=_schema, _ctime=ctime,
_utime=ctime, _const=False)
class Agent(object):
"""
A local representation of a remote agent managed by this console.
"""
def __init__(self, name, console):
"""
@type name: string
@param name: uniquely identifies this agent in the AMQP domain.
"""
if not isinstance(console, Console):
raise TypeError("parameter must be an instance of class Console")
self._name = name
self._address = QmfAddress.direct(name, console._domain)
self._console = console
self._sender = None
self._packages = {} # map of {package-name:[list of class-names], } for this agent
self._subscriptions = [] # list of active standing subscriptions for this agent
self._announce_timestamp = None # datetime when last announce received
trace.debug( "Created Agent with address: [%s]" % self._address )
def get_name(self):
return self._name
def is_active(self):
return self._announce_timestamp != None
def _send_msg(self, msg, correlation_id=None):
"""
Low-level routine to asynchronously send a message to this agent.
"""
msg.reply_to = str(self._console._address)
if correlation_id:
msg.correlation_id = str(correlation_id)
# TRACE
#log.error("!!! Console %s sending to agent %s (%s)" %
# (self._console._name, self._name, str(msg)))
self._sender.send(msg)
# return handle
def get_packages(self):
"""
Return a list of the names of all packages known to this agent.
"""
return self._packages.keys()
def get_classes(self):
"""
Return a dictionary [key:class] of classes known to this agent.
"""
return self._packages.copy()
def get_objects(self, query, kwargs={}):
"""
Return a list of objects that satisfy the given query.
@type query: dict, or common.Query
@param query: filter for requested objects
@type kwargs: dict
@param kwargs: ??? used to build match selector and query ???
@rtype: list
@return: list of matching objects, or None.
"""
pass
def get_object(self, query, kwargs={}):
"""
Get one object - query is expected to match only one object.
??? Recommended: explicit timeout param, default None ???
@type query: dict, or common.Query
@param query: filter for requested objects
@type kwargs: dict
@param kwargs: ??? used to build match selector and query ???
@rtype: qmfConsole.ObjectProxy
@return: one matching object, or none
"""
pass
def create_subscription(self, query):
"""
Factory for creating standing subscriptions based on a given query.
@type query: common.Query object
@param query: determines the list of objects for which this subscription applies
@rtype: qmfConsole.Subscription
@returns: an object representing the standing subscription.
"""
pass
def invoke_method(self, name, _in_args={}, _reply_handle=None,
_timeout=None):
"""
Invoke the named method on this agent.
"""
assert self._console
if _timeout is None:
_timeout = self._console._reply_timeout
if _reply_handle is not None:
mbox = _MethodMailbox(self._console,
_reply_handle)
else:
mbox = _SyncMailbox(self._console)
cid = mbox.get_address()
_map = {SchemaMethod.KEY_NAME:name}
if _in_args:
_map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy()
trace.debug("Sending method req to Agent (%s)" % time.time())
try:
self._send_method_req(_map, cid)
except SendError, e:
log.error(str(e))
mbox.destroy()
return None
if _reply_handle is not None:
return True
trace.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
mbox.destroy()
if not replyMsg:
trace.debug("Agent method req wait timed-out.")
return None
_map = replyMsg.content
if not _map or not isinstance(_map, type({})):
log.error("Invalid method call reply message")
return None
return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS),
_error=_map.get(SchemaMethod.KEY_ERROR))
def enable_events(self):
raise Exception("enable_events tbd")
def disable_events(self):
raise Exception("disable_events tbd")
def destroy(self):
raise Exception("destroy tbd")
def __repr__(self):
return str(self._address)
def __str__(self):
return self.__repr__()
def _send_query(self, query, correlation_id=None):
"""
"""
msg = Message(id=QMF_APP_ID,
properties={"method":"request",
"qmf.opcode":OpCode.query_req},
content=query.map_encode())
self._send_msg( msg, correlation_id )
def _send_method_req(self, mr_map, correlation_id=None):
"""
"""
msg = Message(id=QMF_APP_ID,
properties={"method":"request",
"qmf.opcode":OpCode.method_req},
content=mr_map)
self._send_msg( msg, correlation_id )
def _send_subscribe_req(self, query, correlation_id, _interval=None,
_lifetime=None):
"""
"""
sr_map = {"_query":query.map_encode()}
if _interval is not None:
sr_map["_interval"] = _interval
if _lifetime is not None:
sr_map["_duration"] = _lifetime
msg = Message(id=QMF_APP_ID,
properties={"method":"request",
"qmf.opcode":OpCode.subscribe_req},
content=sr_map)
self._send_msg(msg, correlation_id)
def _send_resubscribe_req(self, correlation_id,
subscription_id):
"""
"""
sr_map = {"_subscription_id":subscription_id}
msg = Message(id=QMF_APP_ID,
properties={"method":"request",
"qmf.opcode":OpCode.subscribe_refresh_ind},
content=sr_map)
self._send_msg(msg, correlation_id)
def _send_unsubscribe_ind(self, correlation_id, subscription_id):
"""
"""
sr_map = {"_subscription_id":subscription_id}
msg = Message(id=QMF_APP_ID,
properties={"method":"request",
"qmf.opcode":OpCode.subscribe_cancel_ind},
content=sr_map)
self._send_msg(msg, correlation_id)
##==============================================================================
## METHOD CALL
##==============================================================================
class MethodResult(object):
def __init__(self, _out_args=None, _error=None):
self._error = _error
self._out_args = _out_args
def succeeded(self):
return self._error is None
def get_exception(self):
return self._error
def get_arguments(self):
return self._out_args
def get_argument(self, name):
arg = None
if self._out_args:
arg = self._out_args.get(name)
return arg
##==============================================================================
## SUBSCRIPTION
##==============================================================================
class SubscribeParams(object):
""" Represents a standing subscription for this console.
"""
def __init__(self, sid, interval, duration, _error=None):
self._sid = sid
self._interval = interval
self._duration = duration
self._error = _error
def succeeded(self):
return self._error is None
def get_error(self):
return self._error
def get_subscription_id(self):
return self._sid
def get_publish_interval(self):
return self._interval
def get_duration(self):
return self._duration
##==============================================================================
## CONSOLE
##==============================================================================
class Console(Thread):
"""
A Console manages communications to a collection of agents on behalf of an application.
"""
def __init__(self, name=None, _domain=None, notifier=None,
reply_timeout = 60,
# agent_timeout = 120,
agent_timeout = 60,
kwargs={}):
"""
@type name: str
@param name: identifier for this console. Must be unique.
@type notifier: qmfConsole.Notifier
@param notifier: invoked when events arrive for processing.
@type kwargs: dict
@param kwargs: ??? Unused
"""
Thread.__init__(self)
self._operational = False
self._ready = Event()
if not name:
self._name = "qmfc-%s.%d" % (platform.node(), os.getpid())
else:
self._name = str(name)
self._domain = _domain
self._address = QmfAddress.direct(self._name, self._domain)
self._notifier = notifier
self._lock = RLock()
self._conn = None
self._session = None
# dict of "agent-direct-address":class Agent entries
self._agent_map = {}
self._direct_recvr = None
self._announce_recvr = None
self._locate_sender = None
self._schema_cache = {}
self._pending_schema_req = []
self._agent_discovery_filter = None
self._reply_timeout = reply_timeout
self._agent_timeout = agent_timeout
self._subscribe_timeout = 300 # @todo: parameterize
self._next_agent_expire = None
self._next_mbox_expire = None
# for passing WorkItems to the application
self._work_q = Queue.Queue()
self._work_q_put = False
# Correlation ID and mailbox storage
self._correlation_id = long(time.time()) # pseudo-randomize
self._post_office = {} # indexed by cid
self._async_mboxes = {} # indexed by cid, used to expire them
def destroy(self, timeout=None):
"""
Must be called before the Console is deleted.
Frees up all resources and shuts down all background threads.
@type timeout: float
@param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever.
"""
trace.debug("Destroying Console...")
if self._conn:
self.remove_connection(self._conn, timeout)
trace.debug("Console Destroyed")
def add_connection(self, conn):
"""
Add a AMQP connection to the console. The console will setup a session over the
connection. The console will then broadcast an Agent Locate Indication over
the session in order to discover present agents.
@type conn: qpid.messaging.Connection
@param conn: the connection to the AMQP messaging infrastructure.
"""
if self._conn:
raise Exception( "Multiple connections per Console not supported." );
self._conn = conn
self._session = conn.session(name=self._name)
# for messages directly addressed to me
self._direct_recvr = self._session.receiver(str(self._address) +
";{create:always,"
" node:"
" {type:topic,"
" x-declare:"
" {type:direct}}}",
capacity=1)
trace.debug("my direct addr=%s" % self._direct_recvr.source)
self._direct_sender = self._session.sender(str(self._address.get_node()) +
";{create:always,"
" node:"
" {type:topic,"
" x-declare:"
" {type:direct}}}")
trace.debug("my direct sender=%s" % self._direct_sender.target)
# for receiving "broadcast" messages from agents
default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#",
self._domain)
self._topic_recvr = self._session.receiver(str(default_addr) +
";{create:always,"
" node:{type:topic}}",
capacity=1)
trace.debug("default topic recv addr=%s" % self._topic_recvr.source)
# for sending to topic subscribers
topic_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND, self._domain)
self._topic_sender = self._session.sender(str(topic_addr) +
";{create:always,"
" node:{type:topic}}")
trace.debug("default topic send addr=%s" % self._topic_sender.target)
#
# Now that receivers are created, fire off the receive thread...
#
self._operational = True
self.start()
self._ready.wait(10)
if not self._ready.isSet():
raise Exception("Console managment thread failed to start.")
def remove_connection(self, conn, timeout=None):
"""
Remove an AMQP connection from the console. Un-does the add_connection() operation,
and releases any agents and sessions associated with the connection.
@type conn: qpid.messaging.Connection
@param conn: connection previously added by add_connection()
"""
if self._conn and conn and conn != self._conn:
log.error( "Attempt to delete unknown connection: %s" % str(conn))
# tell connection thread to shutdown
self._operational = False
if self.isAlive():
# kick my thread to wake it up
self._wake_thread()
trace.debug("waiting for console receiver thread to exit")
self.join(timeout)
if self.isAlive():
log.error( "Console thread '%s' is hung..." % self.getName() )
self._direct_recvr.close()
self._direct_sender.close()
self._topic_recvr.close()
self._topic_sender.close()
self._session.close()
self._session = None
self._conn = None
trace.debug("console connection removal complete")
def get_address(self):
"""
The AMQP address this Console is listening to.
"""
return self._address
def destroy_agent( self, agent ):
"""
Undoes create.
"""
if not isinstance(agent, Agent):
raise TypeError("agent must be an instance of class Agent")
self._lock.acquire()
try:
if agent._name in self._agent_map:
del self._agent_map[agent._name]
finally:
self._lock.release()
def find_agent(self, name, timeout=None ):
"""
Given the name of a particular agent, return an instance of class Agent
representing that agent. Return None if the agent does not exist.
"""
self._lock.acquire()
try:
agent = self._agent_map.get(name)
if agent:
return agent
finally:
self._lock.release()
# agent not present yet - ping it with an agent_locate
mbox = _SyncMailbox(self)
cid = mbox.get_address()
query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
msg = Message(id=QMF_APP_ID,
subject="console.ind.locate." + name,
properties={"method":"request",
"qmf.opcode":OpCode.agent_locate_req},
content=query._predicate)
msg.content_type="amqp/list"
msg.reply_to = str(self._address)
msg.correlation_id = str(cid)
trace.debug("%s Sending Agent Locate (%s)", self._name, str(msg))
try:
self._topic_sender.send(msg)
except SendError, e:
log.error(str(e))
mbox.destroy()
return None
if timeout is None:
timeout = self._reply_timeout
new_agent = None
trace.debug("Waiting for response to Agent Locate (%s)" % timeout)
mbox.fetch(timeout)
mbox.destroy()
trace.debug("Agent Locate wait ended (%s)" % time.time())
self._lock.acquire()
try:
new_agent = self._agent_map.get(name)
finally:
self._lock.release()
return new_agent
def get_agents(self):
"""
Return the list of known agents.
"""
self._lock.acquire()
try:
agents = self._agent_map.values()
finally:
self._lock.release()
return agents
def get_agent(self, name):
"""
Return the named agent, else None if not currently available.
"""
self._lock.acquire()
try:
agent = self._agent_map.get(name)
finally:
self._lock.release()
return agent
def do_query(self, agent, query, _reply_handle=None, _timeout=None ):
"""
"""
target = query.get_target()
if _reply_handle is not None:
mbox = _QueryMailbox(self,
agent.get_name(),
_reply_handle,
target,
_timeout)
else:
mbox = _SyncMailbox(self)
cid = mbox.get_address()
try:
trace.debug("Sending Query to Agent (%s)" % time.time())
agent._send_query(query, cid)
except SendError, e:
log.error(str(e))
mbox.destroy()
return None
# return now if async reply expected
if _reply_handle is not None:
return True
if not _timeout:
_timeout = self._reply_timeout
trace.debug("Waiting for response to Query (%s)" % _timeout)
now = datetime.datetime.utcnow()
expire = now + datetime.timedelta(seconds=_timeout)
response = []
while (expire > now):
_timeout = timedelta_to_secs(expire - now)
reply = mbox.fetch(_timeout)
if not reply:
trace.debug("Query wait timed-out.")
break
objects = reply.content
if not objects or not isinstance(objects, type([])):
break
# convert from map to native types if needed
if target == QmfQuery.TARGET_SCHEMA_ID:
for sid_map in objects:
response.append(SchemaClassId.from_map(sid_map))
elif target == QmfQuery.TARGET_SCHEMA:
for schema_map in objects:
# extract schema id, convert based on schema type
sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
if sid_map:
sid = SchemaClassId.from_map(sid_map)
if sid:
if sid.get_type() == SchemaClassId.TYPE_DATA:
schema = SchemaObjectClass.from_map(schema_map)
else:
schema = SchemaEventClass.from_map(schema_map)
self._add_schema(schema) # add to schema cache
response.append(schema)
elif target == QmfQuery.TARGET_OBJECT:
for obj_map in objects:
obj = QmfConsoleData(map_=obj_map, agent=agent)
# start fetch of schema if not known
sid = obj.get_schema_class_id()
if sid:
self._prefetch_schema(sid, agent)
response.append(obj)
else:
# no conversion needed.
response += objects
if not "partial" in reply.properties:
# reply not broken up over multiple msgs
break
now = datetime.datetime.utcnow()
mbox.destroy()
return response
def create_subscription(self, agent, query, console_handle,
_interval=None, _duration=None,
_blocking=True, _timeout=None):
if not _duration:
_duration = self._subscribe_timeout
if _timeout is None:
_timeout = self._reply_timeout
if not _blocking:
mbox = _AsyncSubscriptionMailbox(self, console_handle, agent,
_duration, _interval)
if not mbox.subscribe(query, _timeout):
mbox.destroy()
return False
return True
else:
mbox = _SubscriptionMailbox(self, console_handle, agent, _duration,
_interval)
if not mbox.subscribe(query):
mbox.destroy()
return None
trace.debug("Waiting for response to subscription (%s)" % _timeout)
# @todo: what if mbox expires here?
sp = mbox.fetch(_timeout)
if not sp:
trace.debug("Subscription request wait timed-out.")
mbox.destroy()
return None
if not sp.succeeded():
mbox.destroy()
return sp
def refresh_subscription(self, subscription_id,
_duration=None,
_timeout=None):
if _timeout is None:
_timeout = self._reply_timeout
mbox = self._get_mailbox(subscription_id)
if not mbox:
log.warning("Subscription %s not found." % subscription_id)
return None
if isinstance(mbox, _AsyncSubscriptionMailbox):
return mbox.resubscribe()
else:
# synchronous - wait for reply
if not mbox.resubscribe():
# @todo ???? mbox.destroy()
return None
# wait for reply
trace.debug("Waiting for response to subscription (%s)" % _timeout)
sp = mbox.fetch(_timeout)
if not sp:
trace.debug("re-subscribe request wait timed-out.")
# @todo???? mbox.destroy()
return None
return sp
def cancel_subscription(self, subscription_id):
"""
"""
mbox = self._get_mailbox(subscription_id)
if not mbox:
return
agent = self.get_agent(mbox.agent_name)
if agent:
try:
trace.debug("Sending UnSubscribe to Agent (%s)" % time.time())
agent._send_unsubscribe_ind(subscription_id,
mbox.agent_subscription_id)
except SendError, e:
log.error(str(e))
mbox.destroy()
def _wake_thread(self):
"""
Make the console management thread loop wakeup from its next_receiver
sleep.
"""
trace.debug("Sending noop to wake up [%s]" % self._address)
msg = Message(id=QMF_APP_ID,
subject=self._name,
properties={"method":"indication",
"qmf.opcode":OpCode.noop},
content={})
try:
self._direct_sender.send( msg, sync=True )
except SendError, e:
log.error(str(e))
def run(self):
"""
Console Management Thread main loop.
Handles inbound messages, agent discovery, async mailbox timeouts.
"""
global _callback_thread
self._ready.set()
while self._operational:
# qLen = self._work_q.qsize()
while True:
try:
msg = self._topic_recvr.fetch(timeout=0)
except Empty:
break
# TRACE:
# log.error("!!! Console %s: msg on %s [%s]" %
# (self._name, self._topic_recvr.source, msg))
self._dispatch(msg, _direct=False)
while True:
try:
msg = self._direct_recvr.fetch(timeout = 0)
except Empty:
break
# TRACE
#log.error("!!! Console %s: msg on %s [%s]" %
# (self._name, self._direct_recvr.source, msg))
self._dispatch(msg, _direct=True)
self._expire_agents() # check for expired agents
self._expire_mboxes() # check for expired async mailbox requests
#if qLen == 0 and self._work_q.qsize() and self._notifier:
if self._work_q_put and self._notifier:
# new stuff on work queue, kick the the application...
self._work_q_put = False
_callback_thread = currentThread()
trace.debug("Calling console notifier.indication")
self._notifier.indication()
_callback_thread = None
# wait for a message to arrive, or an agent
# to expire, or a mailbox requrest to time out
now = datetime.datetime.utcnow()
next_expire = self._next_agent_expire
self._lock.acquire()
try:
# the mailbox expire flag may be cleared by the
# app thread(s) to force an immedate mailbox scan
if self._next_mbox_expire is None:
next_expire = now
elif self._next_mbox_expire < next_expire:
next_expire = self._next_mbox_expire
finally:
self._lock.release()
timeout = timedelta_to_secs(next_expire - now)
if self._operational and timeout > 0.0:
try:
trace.debug("waiting for next rcvr (timeout=%s)..." % timeout)
self._session.next_receiver(timeout = timeout)
except Empty:
pass
trace.debug("Shutting down Console thread")
def get_objects(self,
_object_id=None,
_schema_id=None,
_pname=None, _cname=None,
_agents=None,
_timeout=None):
"""
Retrieve objects by id or schema.
By object_id: must specify schema_id or pname & cname if object defined
by a schema. Undescribed objects: only object_id needed.
By schema: must specify schema_id or pname & cname - all instances of
objects defined by that schema are returned.
"""
if _agents is None:
# use copy of current agent list
self._lock.acquire()
try:
agent_list = self._agent_map.values()
finally:
self._lock.release()
elif isinstance(_agents, Agent):
agent_list = [_agents]
else:
agent_list = _agents
# @todo validate this list!
if _timeout is None:
_timeout = self._reply_timeout
# @todo: fix when async do_query done - query all agents at once, then
# wait for replies, instead of per-agent querying....
obj_list = []
expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout)
for agent in agent_list:
if not agent.is_active():
continue
now = datetime.datetime.utcnow()
if now >= expired:
break
if _pname is None:
if _object_id:
query = QmfQuery.create_id_object(_object_id,
_schema_id)
else:
if _schema_id is not None:
t_params = {QmfData.KEY_SCHEMA_ID: _schema_id}
else:
t_params = None
query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
t_params)
timeout = timedelta_to_secs(expired - now)
reply = self.do_query(agent, query, _timeout=timeout)
if reply:
obj_list = obj_list + reply
else:
# looking up by package name (and maybe class name), need to
# find all schema_ids in that package, then lookup object by
# schema_id
if _cname is not None:
pred = [QmfQuery.AND,
[QmfQuery.EQ,
SchemaClassId.KEY_PACKAGE,
[QmfQuery.QUOTE, _pname]],
[QmfQuery.EQ, SchemaClassId.KEY_CLASS,
[QmfQuery.QUOTE, _cname]]]
else:
pred = [QmfQuery.EQ,
SchemaClassId.KEY_PACKAGE,
[QmfQuery.QUOTE, _pname]]
query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred)
timeout = timedelta_to_secs(expired - now)
sid_list = self.do_query(agent, query, _timeout=timeout)
if sid_list:
for sid in sid_list:
now = datetime.datetime.utcnow()
if now >= expired:
break
if _object_id is not None:
query = QmfQuery.create_id_object(_object_id, sid)
else:
t_params = {QmfData.KEY_SCHEMA_ID: sid}
query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params)
timeout = timedelta_to_secs(expired - now)
reply = self.do_query(agent, query, _timeout=timeout)
if reply:
obj_list = obj_list + reply
if obj_list:
return obj_list
return None
# called by run() thread ONLY
#
def _dispatch(self, msg, _direct=True):
"""
PRIVATE: Process a message received from an Agent
"""
trace.debug( "Message received from Agent! [%s]", msg )
opcode = msg.properties.get("qmf.opcode")
if not opcode:
log.error("Ignoring unrecognized message '%s'", msg)
return
version = 2 # @todo: fix me
cmap = {}; props = {}
if msg.content_type == "amqp/map":
cmap = msg.content
if msg.properties:
props = msg.properties
if opcode == OpCode.agent_heartbeat_ind:
self._handle_agent_ind_msg( msg, cmap, version, _direct )
elif opcode == OpCode.agent_locate_rsp:
self._handle_agent_ind_msg( msg, cmap, version, _direct )
elif msg.correlation_id:
self._handle_response_msg(msg, cmap, version, _direct)
elif opcode == OpCode.data_ind:
self._handle_indication_msg(msg, cmap, version, _direct)
elif opcode == OpCode.noop:
trace.debug("No-op msg received.")
else:
log.warning("Ignoring message with unrecognized 'opcode' value: '%s'", opcode)
def _handle_agent_ind_msg(self, msg, cmap, version, direct):
"""
Process a received agent-ind message. This message may be a response to a
agent-locate, or it can be an unsolicited agent announce.
"""
trace.debug("%s _handle_agent_ind_msg '%s'", self._name, str(msg))
try:
tmp = QmfData.from_map(msg.content)
except:
log.warning("%s invalid Agent Indication msg format '%s'",
self._name, str(msg))
return
try:
name = tmp.get_value("_name")
except:
log.warning("Bad Agent ind msg received: %s", str(msg))
return
correlated = False
if msg.correlation_id:
mbox = self._get_mailbox(msg.correlation_id)
correlated = mbox is not None
agent = None
self._lock.acquire()
try:
agent = self._agent_map.get(name)
if agent:
# agent already known, just update timestamp
agent._announce_timestamp = datetime.datetime.utcnow()
finally:
self._lock.release()
if not agent:
# need to create and add a new agent?
matched = False
if self._agent_discovery_filter:
matched = self._agent_discovery_filter.evaluate(tmp)
if (correlated or matched):
agent = self._create_agent(name)
if not agent:
return # failed to add agent
agent._announce_timestamp = datetime.datetime.utcnow()
if matched:
# unsolicited, but newly discovered
trace.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
self._work_q.put(wi)
self._work_q_put = True
if correlated:
# wake up all waiters
trace.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
def _handle_response_msg(self, msg, cmap, version, direct):
"""
Process a received data-ind message.
"""
trace.debug("%s _handle_response_msg '%s'", self._name, str(msg))
mbox = self._get_mailbox(msg.correlation_id)
if not mbox:
log.warning("%s Response msg received with unknown correlation_id"
" msg='%s'", self._name, str(msg))
return
# wake up all waiters
trace.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
def _handle_indication_msg(self, msg, cmap, version, _direct):
aname = msg.properties.get("qmf.agent")
if not aname:
trace.debug("No agent name field in indication message.")
return
content_type = msg.properties.get("qmf.content")
if (content_type != ContentType.event or
not isinstance(msg.content, type([]))):
log.warning("Bad event indication message received: '%s'", msg)
return
emap = msg.content[0]
if not isinstance(emap, type({})):
trace.debug("Invalid event body in indication message: '%s'", msg)
return
agent = None
self._lock.acquire()
try:
agent = self._agent_map.get(aname)
finally:
self._lock.release()
if not agent:
trace.debug("Agent '%s' not known." % aname)
return
try:
# @todo: schema???
event = QmfEvent.from_map(emap)
except TypeError:
trace.debug("Invalid QmfEvent map received: %s" % str(emap))
return
# @todo: schema? Need to fetch it, but not from this thread!
# This thread can not pend on a request.
trace.debug("Publishing event received from agent %s" % aname)
wi = WorkItem(WorkItem.EVENT_RECEIVED, None,
{"agent":agent,
"event":event})
self._work_q.put(wi)
self._work_q_put = True
def _expire_mboxes(self):
"""
Check all async mailboxes for outstanding requests that have expired.
"""
self._lock.acquire()
try:
now = datetime.datetime.utcnow()
if self._next_mbox_expire and now < self._next_mbox_expire:
return
expired_mboxes = []
self._next_mbox_expire = None
for mbox in self._async_mboxes.itervalues():
if now >= mbox.expiration_date:
expired_mboxes.append(mbox)
else:
if (self._next_mbox_expire is None or
mbox.expiration_date < self._next_mbox_expire):
self._next_mbox_expire = mbox.expiration_date
for mbox in expired_mboxes:
del self._async_mboxes[mbox.cid]
finally:
self._lock.release()
for mbox in expired_mboxes:
# note: expire() may deallocate the mbox, so don't touch
# it further.
mbox.expire()
def _expire_agents(self):
"""
Check for expired agents and issue notifications when they expire.
"""
now = datetime.datetime.utcnow()
if self._next_agent_expire and now < self._next_agent_expire:
return
lifetime_delta = datetime.timedelta(seconds = self._agent_timeout)
next_expire_delta = lifetime_delta
self._lock.acquire()
try:
trace.debug("!!! expiring agents '%s'" % now)
for agent in self._agent_map.itervalues():
if agent._announce_timestamp:
agent_deathtime = agent._announce_timestamp + lifetime_delta
if agent_deathtime <= now:
trace.debug("AGENT_DELETED for %s" % agent)
agent._announce_timestamp = None
wi = WorkItem(WorkItem.AGENT_DELETED, None,
{"agent":agent})
# @todo: remove agent from self._agent_map
self._work_q.put(wi)
self._work_q_put = True
else:
if (agent_deathtime - now) < next_expire_delta:
next_expire_delta = agent_deathtime - now
self._next_agent_expire = now + next_expire_delta
trace.debug("!!! next expire cycle = '%s'" % self._next_agent_expire)
finally:
self._lock.release()
def _create_agent( self, name ):
"""
Factory to create/retrieve an agent for this console
"""
trace.debug("creating agent %s" % name)
self._lock.acquire()
try:
agent = self._agent_map.get(name)
if agent:
return agent
agent = Agent(name, self)
try:
agent._sender = self._session.sender(str(agent._address) +
";{create:always,"
" node:"
" {type:topic,"
" x-declare:"
" {type:direct}}}")
except:
log.warning("Unable to create sender for %s" % name)
return None
trace.debug("created agent sender %s" % agent._sender.target)
self._agent_map[name] = agent
finally:
self._lock.release()
# new agent - query for its schema database for
# seeding the schema cache (@todo)
# query = QmfQuery({QmfQuery.TARGET_SCHEMA_ID:None})
# agent._sendQuery( query )
return agent
def enable_agent_discovery(self, _query=None):
"""
Called to enable the asynchronous Agent Discovery process.
Once enabled, AGENT_ADD work items can arrive on the WorkQueue.
"""
# @todo: fix - take predicate only, not entire query!
if _query is not None:
if (not isinstance(_query, QmfQuery) or
_query.get_target() != QmfQuery.TARGET_AGENT):
raise TypeError("Type QmfQuery with target == TARGET_AGENT expected")
self._agent_discovery_filter = _query
else:
# create a match-all agent query (no predicate)
self._agent_discovery_filter = QmfQuery.create_wildcard(QmfQuery.TARGET_AGENT)
def disable_agent_discovery(self):
"""
Called to disable the async Agent Discovery process enabled by
calling enableAgentDiscovery()
"""
self._agent_discovery_filter = None
def get_workitem_count(self):
"""
Returns the count of pending WorkItems that can be retrieved.
"""
return self._work_q.qsize()
def get_next_workitem(self, timeout=None):
"""
Returns the next pending work item, or None if none available.
@todo: subclass and return an Empty event instead.
"""
try:
wi = self._work_q.get(True, timeout)
except Queue.Empty:
return None
return wi
def release_workitem(self, wi):
"""
Return a WorkItem to the Console when it is no longer needed.
@todo: call Queue.task_done() - only 2.5+
@type wi: class qmfConsole.WorkItem
@param wi: work item object to return.
"""
pass
def _add_schema(self, schema):
"""
@todo
"""
if not isinstance(schema, SchemaClass):
raise TypeError("SchemaClass type expected")
self._lock.acquire()
try:
sid = schema.get_class_id()
if not self._schema_cache.has_key(sid):
self._schema_cache[sid] = schema
if sid in self._pending_schema_req:
self._pending_schema_req.remove(sid)
finally:
self._lock.release()
def _prefetch_schema(self, schema_id, agent):
"""
Send an async request for the schema identified by schema_id if the
schema is not available in the cache.
"""
need_fetch = False
self._lock.acquire()
try:
if ((not self._schema_cache.has_key(schema_id)) and
schema_id not in self._pending_schema_req):
self._pending_schema_req.append(schema_id)
need_fetch = True
finally:
self._lock.release()
if need_fetch:
mbox = _SchemaPrefetchMailbox(self, schema_id)
query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id)
trace.debug("Sending Schema Query to Agent (%s)" % time.time())
try:
agent._send_query(query, mbox.get_address())
except SendError, e:
log.error(str(e))
mbox.destroy()
self._lock.acquire()
try:
self._pending_schema_req.remove(schema_id)
finally:
self._lock.release()
def _fetch_schema(self, schema_id, _agent=None, _timeout=None):
"""
Find the schema identified by schema_id. If not in the cache, ask the
agent for it.
"""
if not isinstance(schema_id, SchemaClassId):
raise TypeError("SchemaClassId type expected")
self._lock.acquire()
try:
schema = self._schema_cache.get(schema_id)
if schema:
return schema
finally:
self._lock.release()
if _agent is None:
return None
# note: do_query will add the new schema to the cache automatically.
slist = self.do_query(_agent,
QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id),
_timeout=_timeout)
if slist:
return slist[0]
else:
return None
def _add_mailbox(self, mbox):
"""
Add a mailbox to the post office, and assign it a unique address.
"""
self._lock.acquire()
try:
mbox.cid = self._correlation_id
self._correlation_id += 1
self._post_office[mbox.cid] = mbox
finally:
self._lock.release()
def _get_mailbox(self, mid):
try:
mid = long(mid)
except TypeError:
log.error("Invalid mailbox id: %s" % str(mid))
return None
self._lock.acquire()
try:
return self._post_office.get(mid)
finally:
self._lock.release()
def _remove_mailbox(self, mid):
""" Remove a mailbox and its address from the post office """
try:
mid = long(mid)
except TypeError:
log.error("Invalid mailbox id: %s" % str(mid))
return None
self._lock.acquire()
try:
if mid in self._post_office:
del self._post_office[mid]
finally:
self._lock.release()
def __repr__(self):
return str(self._address)
# def get_packages(self):
# plist = []
# for i in range(self.impl.packageCount()):
# plist.append(self.impl.getPackageName(i))
# return plist
# def get_classes(self, package, kind=CLASS_OBJECT):
# clist = []
# for i in range(self.impl.classCount(package)):
# key = self.impl.getClass(package, i)
# class_kind = self.impl.getClassKind(key)
# if class_kind == kind:
# if kind == CLASS_OBJECT:
# clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)}))
# elif kind == CLASS_EVENT:
# clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)}))
# return clist
# def bind_package(self, package):
# return self.impl.bindPackage(package)
# def bind_class(self, kwargs = {}):
# if "key" in kwargs:
# self.impl.bindClass(kwargs["key"])
# elif "package" in kwargs:
# package = kwargs["package"]
# if "class" in kwargs:
# self.impl.bindClass(package, kwargs["class"])
# else:
# self.impl.bindClass(package)
# else:
# raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']")
# def get_agents(self, broker=None):
# blist = []
# if broker:
# blist.append(broker)
# else:
# self._cv.acquire()
# try:
# # copy while holding lock
# blist = self._broker_list[:]
# finally:
# self._cv.release()
# agents = []
# for b in blist:
# for idx in range(b.impl.agentCount()):
# agents.append(AgentProxy(b.impl.getAgent(idx), b))
# return agents
# def get_objects(self, query, kwargs = {}):
# timeout = 30
# agent = None
# temp_args = kwargs.copy()
# if type(query) == type({}):
# temp_args.update(query)
# if "_timeout" in temp_args:
# timeout = temp_args["_timeout"]
# temp_args.pop("_timeout")
# if "_agent" in temp_args:
# agent = temp_args["_agent"]
# temp_args.pop("_agent")
# if type(query) == type({}):
# query = Query(temp_args)
# self._select = {}
# for k in temp_args.iterkeys():
# if type(k) == str:
# self._select[k] = temp_args[k]
# self._cv.acquire()
# try:
# self._sync_count = 1
# self._sync_result = []
# broker = self._broker_list[0]
# broker.send_query(query.impl, None, agent)
# self._cv.wait(timeout)
# if self._sync_count == 1:
# raise Exception("Timed out: waiting for query response")
# finally:
# self._cv.release()
# return self._sync_result
# def get_object(self, query, kwargs = {}):
# '''
# Return one and only one object or None.
# '''
# objs = objects(query, kwargs)
# if len(objs) == 1:
# return objs[0]
# else:
# return None
# def first_object(self, query, kwargs = {}):
# '''
# Return the first of potentially many objects.
# '''
# objs = objects(query, kwargs)
# if objs:
# return objs[0]
# else:
# return None
# # Check the object against select to check for a match
# def _select_match(self, object):
# schema_props = object.properties()
# for key in self._select.iterkeys():
# for prop in schema_props:
# if key == p[0].name() and self._select[key] != p[1]:
# return False
# return True
# def _get_result(self, list, context):
# '''
# Called by Broker proxy to return the result of a query.
# '''
# self._cv.acquire()
# try:
# for item in list:
# if self._select_match(item):
# self._sync_result.append(item)
# self._sync_count -= 1
# self._cv.notify()
# finally:
# self._cv.release()
# def start_sync(self, query): pass
# def touch_sync(self, sync): pass
# def end_sync(self, sync): pass
# def start_console_events(self):
# self._cb_cond.acquire()
# try:
# self._cb_cond.notify()
# finally:
# self._cb_cond.release()
# def _do_console_events(self):
# '''
# Called by the Console thread to poll for events. Passes the events
# onto the ConsoleHandler associated with this Console. Is called
# periodically, but can also be kicked by Console.start_console_events().
# '''
# count = 0
# valid = self.impl.getEvent(self._event)
# while valid:
# count += 1
# try:
# if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
# trace.debug("Console Event AGENT_ADDED received")
# if self._handler:
# self._handler.agent_added(AgentProxy(self._event.agent, None))
# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
# trace.debug("Console Event AGENT_DELETED received")
# if self._handler:
# self._handler.agent_deleted(AgentProxy(self._event.agent, None))
# elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
# trace.debug("Console Event NEW_PACKAGE received")
# if self._handler:
# self._handler.new_package(self._event.name)
# elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
# trace.debug("Console Event NEW_CLASS received")
# if self._handler:
# self._handler.new_class(SchemaClassKey(self._event.classKey))
# elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
# trace.debug("Console Event OBJECT_UPDATE received")
# if self._handler:
# self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
# self._event.hasProps, self._event.hasStats)
# elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
# trace.debug("Console Event EVENT_RECEIVED received")
# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
# trace.debug("Console Event AGENT_HEARTBEAT received")
# if self._handler:
# self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
# elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
# trace.debug("Console Event METHOD_RESPONSE received")
# else:
# trace.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
# except e:
# print "Exception caught in callback thread:", e
# self.impl.popEvent()
# valid = self.impl.getEvent(self._event)
# return count
# class Broker(ConnectionHandler):
# # attr_reader :impl :conn, :console, :broker_bank
# def __init__(self, console, conn):
# self.broker_bank = 1
# self.console = console
# self.conn = conn
# self._session = None
# self._cv = Condition()
# self._stable = None
# self._event = qmfengine.BrokerEvent()
# self._xmtMessage = qmfengine.Message()
# self.impl = qmfengine.BrokerProxy(self.console.impl)
# self.console.impl.addConnection(self.impl, self)
# self.conn.add_conn_handler(self)
# self._operational = True
# def shutdown(self):
# trace.debug("broker.shutdown() called.")
# self.console.impl.delConnection(self.impl)
# self.conn.del_conn_handler(self)
# if self._session:
# self.impl.sessionClosed()
# trace.debug("broker.shutdown() sessionClosed done.")
# self._session.destroy()
# trace.debug("broker.shutdown() session destroy done.")
# self._session = None
# self._operational = False
# trace.debug("broker.shutdown() done.")
# def wait_for_stable(self, timeout = None):
# self._cv.acquire()
# try:
# if self._stable:
# return
# if timeout:
# self._cv.wait(timeout)
# if not self._stable:
# raise Exception("Timed out: waiting for broker connection to become stable")
# else:
# while not self._stable:
# self._cv.wait()
# finally:
# self._cv.release()
# def send_query(self, query, ctx, agent):
# agent_impl = None
# if agent:
# agent_impl = agent.impl
# self.impl.sendQuery(query, ctx, agent_impl)
# self.conn.kick()
# def _do_broker_events(self):
# count = 0
# valid = self.impl.getEvent(self._event)
# while valid:
# count += 1
# if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO:
# trace.debug("Broker Event BROKER_INFO received");
# elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE:
# trace.debug("Broker Event DECLARE_QUEUE received");
# self.conn.impl.declareQueue(self._session.handle, self._event.name)
# elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE:
# trace.debug("Broker Event DELETE_QUEUE received");
# self.conn.impl.deleteQueue(self._session.handle, self._event.name)
# elif self._event.kind == qmfengine.BrokerEvent.BIND:
# trace.debug("Broker Event BIND received");
# self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
# elif self._event.kind == qmfengine.BrokerEvent.UNBIND:
# trace.debug("Broker Event UNBIND received");
# self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
# elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE:
# trace.debug("Broker Event SETUP_COMPLETE received");
# self.impl.startProtocol()
# elif self._event.kind == qmfengine.BrokerEvent.STABLE:
# trace.debug("Broker Event STABLE received");
# self._cv.acquire()
# try:
# self._stable = True
# self._cv.notify()
# finally:
# self._cv.release()
# elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE:
# result = []
# for idx in range(self._event.queryResponse.getObjectCount()):
# result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self}))
# self.console._get_result(result, self._event.context)
# elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE:
# obj = self._event.context
# obj._method_result(MethodResponse(self._event.methodResponse()))
# self.impl.popEvent()
# valid = self.impl.getEvent(self._event)
# return count
# def _do_broker_messages(self):
# count = 0
# valid = self.impl.getXmtMessage(self._xmtMessage)
# while valid:
# count += 1
# trace.debug("Broker: sending msg on connection")
# self.conn.impl.sendMessage(self._session.handle, self._xmtMessage)
# self.impl.popXmt()
# valid = self.impl.getXmtMessage(self._xmtMessage)
# return count
# def _do_events(self):
# while True:
# self.console.start_console_events()
# bcnt = self._do_broker_events()
# mcnt = self._do_broker_messages()
# if bcnt == 0 and mcnt == 0:
# break;
# def conn_event_connected(self):
# trace.debug("Broker: Connection event CONNECTED")
# self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
# self.impl.sessionOpened(self._session.handle)
# self._do_events()
# def conn_event_disconnected(self, error):
# trace.debug("Broker: Connection event DISCONNECTED")
# pass
# def conn_event_visit(self):
# self._do_events()
# def sess_event_session_closed(self, context, error):
# trace.debug("Broker: Session event CLOSED")
# self.impl.sessionClosed()
# def sess_event_recv(self, context, message):
# trace.debug("Broker: Session event MSG_RECV")
# if not self._operational:
# log.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
# self.impl.handleRcvMessage(message)
# self._do_events()
################################################################################
################################################################################
################################################################################
################################################################################
# TEMPORARY TEST CODE - TO BE DELETED
################################################################################
################################################################################
################################################################################
################################################################################
if __name__ == '__main__':
# temp test code
import logging
from common import (qmfTypes, SchemaProperty)
logging.getLogger().setLevel(logging.INFO)
logging.info( "************* Creating Async Console **************" )
class MyNotifier(Notifier):
def __init__(self, context):
self._myContext = context
self.WorkAvailable = False
def indication(self):
print("Indication received! context=%d" % self._myContext)
self.WorkAvailable = True
_noteMe = MyNotifier( 666 )
_myConsole = Console(notifier=_noteMe)
_myConsole.enable_agent_discovery()
logging.info("Waiting...")
logging.info( "Destroying console:" )
_myConsole.destroy( 10 )
logging.info( "******** Messing around with Schema ********" )
_sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass",
stype=SchemaClassId.TYPE_EVENT),
_desc="A typical event schema",
_props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8,
kwargs = {"min":0,
"max":100,
"unit":"seconds",
"desc":"sleep value"}),
"Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR,
kwargs={"maxlen":100,
"desc":"a string argument"})})
print("_sec=%s" % _sec.get_class_id())
print("_sec.gePropertyCount()=%d" % _sec.get_property_count() )
print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') )
print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') )
try:
print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') )
except:
pass
print("_sec.getProperties()='%s'" % _sec.get_properties())
print("Adding another argument")
_arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL,
kwargs={"dir":"IO",
"desc":"a boolean argument"})
_sec.add_property('Argument-3', _arg3)
print("_sec=%s" % _sec.get_class_id())
print("_sec.getPropertyCount()=%d" % _sec.get_property_count() )
print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') )
print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') )
print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') )
print("_arg3.mapEncode()='%s'" % _arg3.map_encode() )
_secmap = _sec.map_encode()
print("_sec.mapEncode()='%s'" % _secmap )
_sec2 = SchemaEventClass( _map=_secmap )
print("_sec=%s" % _sec.get_class_id())
print("_sec2=%s" % _sec2.get_class_id())
_soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage",
"_class_name": "myOtherClass",
"_type": "_data"},
"_desc": "A test data object",
"_values":
{"prop1": {"amqp_type": qmfTypes.TYPE_UINT8,
"access": "RO",
"index": True,
"unit": "degrees"},
"prop2": {"amqp_type": qmfTypes.TYPE_UINT8,
"access": "RW",
"index": True,
"desc": "The Second Property(tm)",
"unit": "radians"},
"statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME,
"unit": "seconds",
"desc": "time until I retire"},
"meth1": {"_desc": "A test method",
"_arguments":
{"arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
"desc": "an argument 1",
"dir": "I"},
"arg2": {"amqp_type": qmfTypes.TYPE_BOOL,
"dir": "IO",
"desc": "some weird boolean"}}},
"meth2": {"_desc": "A test method",
"_arguments":
{"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
"desc": "an 'nuther argument",
"dir":
"I"}}}},
"_subtypes":
{"prop1":"qmfProperty",
"prop2":"qmfProperty",
"statistics":"qmfProperty",
"meth1":"qmfMethod",
"meth2":"qmfMethod"},
"_primary_key_names": ["prop2", "prop1"]})
print("_soc='%s'" % _soc)
print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names())
print("_soc.getPropertyCount='%d'" % _soc.get_property_count())
print("_soc.getProperties='%s'" % _soc.get_properties())
print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2'))
print("_soc.getMethodCount='%d'" % _soc.get_method_count())
print("_soc.getMethods='%s'" % _soc.get_methods())
print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2'))
_socmap = _soc.map_encode()
print("_socmap='%s'" % _socmap)
_soc2 = SchemaObjectClass( _map=_socmap )
print("_soc='%s'" % _soc)
print("_soc2='%s'" % _soc2)
if _soc2.get_class_id() == _soc.get_class_id():
print("soc and soc2 are the same schema")
logging.info( "******** Messing around with ObjectIds ********" )
qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} )
print("qd='%s':" % qd)
print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4))
print("qd map='%s'" % qd.map_encode())
print("qd getProperty('prop4')='%s'" % qd.get_value("prop4"))
qd.set_value("prop4", 4, "A test property called 4")
print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4"))
qd.prop4 = 9
print("qd.prop4 = 9 ='%s'" % qd.prop4)
qd["prop4"] = 11
print("qd[prop4] = 11 ='%s'" % qd["prop4"])
print("qd.mapEncode()='%s'" % qd.map_encode())
_qd2 = QmfData( _map = qd.map_encode() )
print("_qd2.mapEncode()='%s'" % _qd2.map_encode())
_qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666,
"prop2": 0}},
agent="some agent name?",
_schema = _soc)
print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode())
_qmfDesc1._set_schema( _soc )
print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2"))
print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id())
print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id())
_qmfDescMap = _qmfDesc1.map_encode()
print("_qmfDescMap='%s'" % _qmfDescMap)
_qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc )
print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode())
print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2"))
print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id())
logging.info( "******** Messing around with QmfEvents ********" )
_qmfevent1 = QmfEvent( _timestamp = 1111,
_schema = _sec,
_values = {"Argument-1": 77,
"Argument-3": True,
"Argument-2": "a string"})
print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode())
print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp())
_qmfevent1Map = _qmfevent1.map_encode()
_qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec)
print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode())
logging.info( "******** Messing around with Queries ********" )
_q1 = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT,
[QmfQuery.AND,
[QmfQuery.EQ, "vendor", [QmfQuery.QUOTE, "AVendor"]],
[QmfQuery.EQ, [QmfQuery.QUOTE, "SomeProduct"], "product"],
[QmfQuery.EQ, [QmfQuery.UNQUOTE, "name"], [QmfQuery.QUOTE, "Thingy"]],
[QmfQuery.OR,
[QmfQuery.LE, "temperature", -10],
[QmfQuery.FALSE],
[QmfQuery.EXISTS, "namey"]]])
print("_q1.mapEncode() = [%s]" % _q1.map_encode())