| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| import sys |
| import socket |
| import os |
| import logging |
| from threading import Thread |
| from threading import RLock |
| from threading import Condition |
| import qmfengine |
| from qmfengine import (ACCESS_READ_CREATE, ACCESS_READ_ONLY, ACCESS_READ_WRITE) |
| from qmfengine import (CLASS_EVENT, CLASS_OBJECT) |
| from qmfengine import (DIR_IN, DIR_IN_OUT, DIR_OUT) |
| from qmfengine import (TYPE_ABSTIME, TYPE_ARRAY, TYPE_BOOL, TYPE_DELTATIME, |
| TYPE_DOUBLE, TYPE_FLOAT, TYPE_INT16, TYPE_INT32, TYPE_INT64, |
| TYPE_INT8, TYPE_LIST, TYPE_LSTR, TYPE_MAP, TYPE_OBJECT, |
| TYPE_REF, TYPE_SSTR, TYPE_UINT16, TYPE_UINT32, TYPE_UINT64, |
| TYPE_UINT8, TYPE_UUID) |
| from qmfengine import (O_EQ, O_NE, O_LT, O_LE, O_GT, O_GE, O_RE_MATCH, O_RE_NOMATCH, |
| E_NOT, E_AND, E_OR, E_XOR) |
| from qmfengine import (SEV_EMERG, SEV_ALERT, SEV_CRIT, SEV_ERROR, SEV_WARN, SEV_NOTICE, |
| SEV_INFORM, SEV_DEBUG) |
| |
| |
| def qmf_to_native(val): |
| typecode = val.getType() |
| if typecode == TYPE_UINT8: return val.asUint() |
| elif typecode == TYPE_UINT16: return val.asUint() |
| elif typecode == TYPE_UINT32: return val.asUint() |
| elif typecode == TYPE_UINT64: return val.asUint64() |
| elif typecode == TYPE_SSTR: return val.asString() |
| elif typecode == TYPE_LSTR: return val.asString() |
| elif typecode == TYPE_ABSTIME: return val.asInt64() |
| elif typecode == TYPE_DELTATIME: return val.asUint64() |
| elif typecode == TYPE_REF: return ObjectId(val.asObjectId()) |
| elif typecode == TYPE_BOOL: return val.asBool() |
| elif typecode == TYPE_FLOAT: return val.asFloat() |
| elif typecode == TYPE_DOUBLE: return val.asDouble() |
| elif typecode == TYPE_UUID: return val.asUuid() |
| elif typecode == TYPE_INT8: return val.asInt() |
| elif typecode == TYPE_INT16: return val.asInt() |
| elif typecode == TYPE_INT32: return val.asInt() |
| elif typecode == TYPE_INT64: return val.asInt64() |
| elif typecode == TYPE_MAP: return value_to_dict(val) |
| elif typecode == TYPE_LIST: return value_to_list(val) |
| else: |
| # when TYPE_OBJECT |
| logging.error( "Unsupported type for get_attr? '%s'" % str(val.getType()) ) |
| return None |
| |
| |
| def native_to_qmf(target, value): |
| val = None |
| typecode = None |
| if target.__class__ == qmfengine.Value: |
| val = target |
| typecode = val.getType() |
| else: |
| typecode = target |
| val = qmfengine.Value(typecode) |
| |
| if typecode == TYPE_UINT8: val.setUint(value) |
| elif typecode == TYPE_UINT16: val.setUint(value) |
| elif typecode == TYPE_UINT32: val.setUint(value) |
| elif typecode == TYPE_UINT64: val.setUint64(value) |
| elif typecode == TYPE_SSTR: |
| if value: val.setString(value) |
| else: val.setString('') |
| elif typecode == TYPE_LSTR: |
| if value: val.setString(value) |
| else: val.setString('') |
| elif typecode == TYPE_ABSTIME: val.setInt64(value) |
| elif typecode == TYPE_DELTATIME: val.setUint64(value) |
| elif typecode == TYPE_REF: val.setObjectId(value.impl) |
| elif typecode == TYPE_BOOL: val.setBool(value) |
| elif typecode == TYPE_FLOAT: val.setFloat(value) |
| elif typecode == TYPE_DOUBLE: val.setDouble(value) |
| elif typecode == TYPE_UUID: val.setUuid(value) |
| elif typecode == TYPE_INT8: val.setInt(value) |
| elif typecode == TYPE_INT16: val.setInt(value) |
| elif typecode == TYPE_INT32: val.setInt(value) |
| elif typecode == TYPE_INT64: val.setInt64(value) |
| elif typecode == TYPE_MAP: dict_to_value(val, value) |
| elif typecode == TYPE_LIST: list_to_value(val, value) |
| else: |
| # when TYPE_OBJECT |
| logging.error("Unsupported type for get_attr? '%s'" % str(val.getType())) |
| return None |
| return val |
| |
| |
| def pick_qmf_type(value): |
| if value.__class__ == int: |
| if value >= 0: |
| if value < 0x100000000: return TYPE_UINT32 |
| return TYPE_UINT64 |
| else: |
| if value > -0xffffffff: return TYPE_INT32 |
| return TYPE_INT64 |
| |
| if value.__class__ == long: |
| if value >= 0: return TYPE_UINT64 |
| return TYPE_INT64 |
| |
| if value.__class__ == str: |
| if len(value) < 256: return TYPE_SSTR |
| return TYPE_LSTR |
| |
| if value.__class__ == float: return TYPE_DOUBLE |
| if value.__class__ == bool: return TYPE_BOOL |
| if value == None: return TYPE_BOOL |
| if value.__class__ == dict: return TYPE_MAP |
| if value.__class__ == list: return TYPE_LIST |
| |
| raise "QMF type not known for native type %s" % value.__class__ |
| |
| |
| def value_to_dict(val): |
| if not val.isMap(): raise "value_to_dict must be given a map value" |
| mymap = {} |
| for i in range(val.keyCount()): |
| key = val.key(i) |
| mymap[key] = qmf_to_native(val.byKey(key)) |
| return mymap |
| |
| |
| def dict_to_value(val, mymap): |
| for key, value in mymap.items(): |
| if key.__class__ != str: raise "QMF map key must be a string" |
| typecode = pick_qmf_type(value) |
| val.insert(key, native_to_qmf(typecode, value)) |
| |
| |
| def value_to_list(val): |
| mylist = [] |
| if val.isList(): |
| for i in range(val.listItemCount()): |
| mylist.append(qmf_to_native(val.listItem(i))) |
| return mylist |
| #if val.isArray(): |
| # for i in range(val.arrayItemCount()): |
| # mylist.append(qmf_to_native(val.arrayItem(i))) |
| # return mylist |
| |
| raise "value_to_list must be given a list value" |
| |
| |
| def list_to_value(val, mylist): |
| for item in mylist: |
| typecode = pick_qmf_type(item) |
| val.appendToList(native_to_qmf(typecode, item)) |
| |
| |
| ##============================================================================== |
| ## CONNECTION |
| ##============================================================================== |
| |
| class ConnectionSettings(object): |
| #attr_reader :impl |
| def __init__(self, url=None): |
| if url: |
| self.impl = qmfengine.ConnectionSettings(url) |
| else: |
| self.impl = qmfengine.ConnectionSettings() |
| |
| |
| def set_attr(self, key, val): |
| if type(val) == str: |
| _v = qmfengine.Value(TYPE_LSTR) |
| _v.setString(val) |
| elif type(val) == int: |
| _v = qmfengine.Value(TYPE_UINT32) |
| _v.setUint(val) |
| elif type(val) == bool: |
| _v = qmfengine.Value(TYPE_BOOL) |
| _v.setBool(val) |
| else: |
| raise Exception("Argument error: value for attribute '%s' has unsupported type: %s" % ( key, type(val))) |
| |
| good = self.impl.setAttr(key, _v) |
| if not good: |
| raise Exception("Argument error: unsupported attribute '%s'" % key ) |
| |
| |
| def get_attr(self, key): |
| _v = self.impl.getAttr(key) |
| if _v.isString(): |
| return _v.asString() |
| elif _v.isUint(): |
| return _v.asUint() |
| elif _v.isBool(): |
| return _v.asBool() |
| else: |
| raise Exception("Argument error: value for attribute '%s' has unsupported type: %s" % ( key, str(_v.getType()))) |
| |
| |
| def __getattr__(self, name): |
| return self.get_attr(name) |
| |
| |
| def __setattr__(self, name, value): |
| if name == "impl": |
| return super.__setattr__(self, name, value) |
| return self.set_attr(name, value) |
| |
| |
| |
| class ConnectionHandler: |
| def conn_event_connected(self): None |
| def conn_event_disconnected(self, error): None |
| def conn_event_visit(self): None |
| def sess_event_session_closed(self, context, error): None |
| def sess_event_recv(self, context, message): None |
| |
| |
| |
| class Connection(Thread): |
| def __init__(self, settings): |
| Thread.__init__(self) |
| self._lock = RLock() |
| self.impl = qmfengine.ResilientConnection(settings.impl) |
| self._sockEngine, self._sock = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) |
| self.impl.setNotifyFd(self._sockEngine.fileno()) |
| self._new_conn_handlers = [] |
| self._conn_handlers_to_delete = [] |
| self._conn_handlers = [] |
| self._connected = False |
| self._operational = True |
| self.start() |
| |
| |
| def destroy(self, timeout=None): |
| logging.debug("Destroying Connection...") |
| self._operational = False |
| self.kick() |
| self.join(timeout) |
| logging.debug("... Conn Destroyed!" ) |
| if self.isAlive(): |
| logging.error("Error: Connection thread '%s' is hung..." % self.getName()) |
| |
| |
| def connected(self): |
| return self._connected |
| |
| |
| def kick(self): |
| self.impl.notify() |
| |
| |
| def add_conn_handler(self, handler): |
| self._lock.acquire() |
| try: |
| self._new_conn_handlers.append(handler) |
| finally: |
| self._lock.release() |
| self.kick() |
| |
| |
| def del_conn_handler(self, handler): |
| self._lock.acquire() |
| try: |
| self._conn_handlers_to_delete.append(handler) |
| finally: |
| self._lock.release() |
| self.kick() |
| |
| |
| def run(self): |
| eventImpl = qmfengine.ResilientConnectionEvent() |
| new_handlers = [] |
| del_handlers = [] |
| bt_count = 0 |
| |
| while self._operational: |
| logging.debug("Connection thread waiting for socket data...") |
| self._sock.recv(1) |
| |
| self._lock.acquire() |
| try: |
| new_handlers = self._new_conn_handlers |
| del_handlers = self._conn_handlers_to_delete |
| self._new_conn_handlers = [] |
| self._conn_handlers_to_delete = [] |
| finally: |
| self._lock.release() |
| |
| for nh in new_handlers: |
| self._conn_handlers.append(nh) |
| if self._connected: |
| nh.conn_event_connected() |
| new_handlers = [] |
| |
| for dh in del_handlers: |
| if dh in self._conn_handlers: |
| self._conn_handlers.remove(dh) |
| del_handlers = [] |
| |
| valid = self.impl.getEvent(eventImpl) |
| while valid: |
| try: |
| if eventImpl.kind == qmfengine.ResilientConnectionEvent.CONNECTED: |
| logging.debug("Connection thread: CONNECTED event received.") |
| self._connected = True |
| for h in self._conn_handlers: |
| h.conn_event_connected() |
| |
| elif eventImpl.kind == qmfengine.ResilientConnectionEvent.DISCONNECTED: |
| logging.debug("Connection thread: DISCONNECTED event received.") |
| self._connected = False |
| for h in self._conn_handlers: |
| h.conn_event_disconnected(eventImpl.errorText) |
| |
| elif eventImpl.kind == qmfengine.ResilientConnectionEvent.SESSION_CLOSED: |
| logging.debug("Connection thread: SESSION_CLOSED event received.") |
| eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText) |
| |
| elif eventImpl.kind == qmfengine.ResilientConnectionEvent.RECV: |
| logging.debug("Connection thread: RECV event received.") |
| eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message) |
| else: |
| logging.debug("Connection thread received unknown event: '%s'" % str(eventImpl.kind)) |
| |
| except: |
| import traceback |
| logging.error( "Exception occurred during Connection event processing:" ) |
| logging.error( str(sys.exc_info()) ) |
| if bt_count < 2: |
| traceback.print_exc() |
| traceback.print_stack() |
| bt_count += 1 |
| |
| self.impl.popEvent() |
| valid = self.impl.getEvent(eventImpl) |
| |
| for h in self._conn_handlers: |
| h.conn_event_visit() |
| |
| logging.debug("Shutting down Connection thread") |
| |
| |
| |
| class Session: |
| def __init__(self, conn, label, handler): |
| self._conn = conn |
| self._label = label |
| self.handler = handler |
| self.handle = qmfengine.SessionHandle() |
| result = self._conn.impl.createSession(label, self, self.handle) |
| |
| |
| def destroy(self): |
| self._conn.impl.destroySession(self.handle) |
| |
| |
| |
| ##============================================================================== |
| ## OBJECTS and EVENTS |
| ##============================================================================== |
| |
| class QmfEvent(object): |
| # attr_reader :impl, :event_class |
| def __init__(self, cls, kwargs={}): |
| self._allow_sets = True |
| if kwargs.has_key("broker"): |
| self._broker = kwargs["broker"] |
| else: |
| self._broker = None |
| if cls: |
| self.event_class = cls |
| self.impl = qmfengine.Event(self.event_class.impl) |
| elif kwargs.has_key("impl"): |
| self.impl = qmfengine.Event(kwargs["impl"]) |
| self.event_class = SchemaEventClass(None, None, 0, |
| {"impl":self.impl.getClass()}) |
| else: |
| raise Exception("Argument error: required parameter ('impl') not supplied") |
| |
| |
| def arguments(self): |
| list = [] |
| for arg in self.event_class.arguments: |
| list.append([arg, self.get_attr(arg.name())]) |
| return list |
| |
| |
| def get_attr(self, name): |
| val = self._value(name) |
| return qmf_to_native(val) |
| |
| |
| def set_attr(self, name, v): |
| val = self._value(name) |
| native_to_qmf(val, v) |
| |
| |
| def __getitem__(self, name): |
| return self.get_attr(name) |
| |
| |
| def __setitem__(self, name, value): |
| self.set_attr(name, value) |
| |
| |
| def __setattr__(self, name, value): |
| # |
| # Ignore the internal attributes, set them normally... |
| # |
| if (name[0] == '_' or |
| name == 'impl' or |
| name == 'event_class'): |
| return super.__setattr__(self, name, value) |
| |
| if not self._allow_sets: |
| raise Exception("'Set' operations not permitted on this object") |
| |
| # |
| # If the name matches an argument name, set the value of the argument. |
| # |
| # print "set name=%s" % str(name) |
| for arg in self.event_class.arguments: |
| if arg.name() == name: |
| return self.set_attr(name, value) |
| |
| # unrecognized name? should I raise an exception? |
| super.__setattr__(self, name, value) |
| |
| |
| def __getattr__(self, name, *args): |
| # |
| # If the name matches an argument name, return the value of the argument. |
| # |
| for arg in self.event_class.arguments: |
| if arg.name() == name: |
| return self.get_attr(name) |
| |
| # |
| # This name means nothing to us, pass it up the line to the parent |
| # class's handler. |
| # |
| # print "__getattr__=%s" % str(name) |
| super.__getattr__(self, name) |
| |
| |
| def _value(self, name): |
| val = self.impl.getValue(name) |
| if not val: |
| raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" % |
| (name, |
| self.event_class.impl.getClassKey().getPackageName(), |
| self.event_class.impl.getClassKey().getClassName())) |
| return val |
| |
| |
| class QmfObject(object): |
| # attr_reader :impl, :object_class |
| def __init__(self, cls, kwargs={}): |
| self._cv = Condition() |
| self._sync_count = 0 |
| self._sync_result = None |
| self._allow_sets = False |
| if kwargs.has_key("broker"): |
| self._broker = kwargs["broker"] |
| else: |
| self._broker = None |
| if cls: |
| self.object_class = cls |
| self.impl = qmfengine.Object(self.object_class.impl) |
| elif kwargs.has_key("impl"): |
| self.impl = qmfengine.Object(kwargs["impl"]) |
| self.object_class = SchemaObjectClass(None, |
| None, |
| {"impl":self.impl.getClass()}) |
| else: |
| raise Exception("Argument error: required parameter ('impl') not supplied") |
| |
| |
| def destroy(self): |
| self.impl.destroy() |
| |
| |
| def object_id(self): |
| return ObjectId(self.impl.getObjectId()) |
| |
| |
| def set_object_id(self, oid): |
| self.impl.setObjectId(oid.impl) |
| |
| |
| def properties(self): |
| list = [] |
| for prop in self.object_class.properties: |
| list.append([prop, self.get_attr(prop.name())]) |
| return list |
| |
| |
| def statistics(self): |
| list = [] |
| for stat in self.object_class.statistics: |
| list.append([stat, self.get_attr(stat.name())]) |
| return list |
| |
| |
| def get_attr(self, name): |
| val = self._value(name) |
| return qmf_to_native(val) |
| |
| |
| def set_attr(self, name, v): |
| val = self._value(name) |
| native_to_qmf(val, v) |
| |
| |
| def __getitem__(self, name): |
| return self.get_attr(name) |
| |
| |
| def __setitem__(self, name, value): |
| self.set_attr(name, value) |
| |
| |
| def inc_attr(self, name, by=1): |
| self.set_attr(name, self.get_attr(name) + by) |
| |
| |
| def dec_attr(self, name, by=1): |
| self.set_attr(name, self.get_attr(name) - by) |
| |
| |
| def __setattr__(self, name, value): |
| # |
| # Ignore the internal attributes, set them normally... |
| # |
| if (name[0] == '_' or |
| name == 'impl' or |
| name == 'object_class'): |
| return super.__setattr__(self, name, value) |
| |
| if not self._allow_sets: |
| raise Exception("'Set' operations not permitted on this object") |
| # |
| # If the name matches a property name, set the value of the property. |
| # |
| # print "set name=%s" % str(name) |
| for prop in self.object_class.properties: |
| if prop.name() == name: |
| return self.set_attr(name, value) |
| # |
| # otherwise, check for a statistic set... |
| # |
| for stat in self.object_class.statistics: |
| if stat.name() == name: |
| return self.set_attr(name, value) |
| |
| # unrecognized name? should I raise an exception? |
| super.__setattr__(self, name, value) |
| |
| |
| def __getattr__(self, name, *args): |
| # |
| # If the name matches a property name, return the value of the property. |
| # |
| for prop in self.object_class.properties: |
| if prop.name() == name: |
| return self.get_attr(name) |
| # |
| # Do the same for statistics |
| # |
| for stat in self.object_class.statistics: |
| if stat.name() == name: |
| return self.get_attr(name) |
| # |
| # If we still haven't found a match for the name, check to see if |
| # it matches a method name. If so, marshall up the arguments into |
| # a map, and invoke the method. |
| # |
| for method in self.object_class.methods: |
| if method.name() == name: |
| argMap = self._marshall(method, args) |
| return lambda name, argMap : self._invokeMethod(name, argMap) |
| |
| # |
| # This name means nothing to us, pass it up the line to the parent |
| # class's handler. |
| # |
| # print "__getattr__=%s" % str(name) |
| super.__getattr__(self, name) |
| |
| |
| def _invokeMethod(self, name, argMap): |
| """ |
| Private: Helper function that invokes an object's method, and waits for the result. |
| """ |
| self._cv.acquire() |
| try: |
| timeout = 30 |
| self._sync_count = 1 |
| self.impl.invokeMethod(name, argMap, self) |
| if self._broker: |
| self._broker.conn.kick() |
| self._cv.wait(timeout) |
| if self._sync_count == 1: |
| raise Exception("Timed out: waiting for response to method call.") |
| finally: |
| self._cv.release() |
| |
| return self._sync_result |
| |
| |
| def _method_result(self, result): |
| """ |
| Called to return the result of a method call on an object |
| """ |
| self._cv.acquire(); |
| try: |
| self._sync_result = result |
| self._sync_count -= 1 |
| self._cv.notify() |
| finally: |
| self._cv.release() |
| |
| |
| def _marshall(schema, args): |
| ''' |
| Private: Convert a list of arguments (positional) into a Value object of type "map". |
| Used to create the argument parameter for an object's method invokation. |
| ''' |
| # Build a map of the method's arguments |
| map = qmfengine.Value(TYPE_MAP) |
| for arg in schema.arguments: |
| if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT: |
| map.insert(arg.name, qmfengine.Value(arg.typecode)) |
| |
| # install each argument's value into the map |
| marshalled = Arguments(map) |
| idx = 0 |
| for arg in schema.arguments: |
| if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT: |
| if args[idx]: |
| marshalled[arg.name] = args[idx] |
| idx += 1 |
| |
| return marshalled.map |
| |
| |
| def _value(self, name): |
| val = self.impl.getValue(name) |
| if not val: |
| raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" % |
| (name, |
| self.object_class.impl.getClassKey().getPackageName(), |
| self.object_class.impl.getClassKey().getClassName())) |
| return val |
| |
| |
| |
| class AgentObject(QmfObject): |
| def __init__(self, cls, kwargs={}): |
| QmfObject.__init__(self, cls, kwargs) |
| self._allow_sets = True |
| |
| |
| def destroy(self): |
| self.impl.destroy() |
| |
| |
| def set_object_id(self, oid): |
| self.impl.setObjectId(oid.impl) |
| |
| |
| |
| class ConsoleObject(QmfObject): |
| # attr_reader :current_time, :create_time, :delete_time |
| def __init__(self, cls, kwargs={}): |
| QmfObject.__init__(self, cls, kwargs) |
| |
| |
| def update(self): |
| if not self._broker: |
| raise Exception("No linkage to broker") |
| newer = self._broker.console.objects(Query({"object_id":object_id})) |
| if newer.size != 1: |
| raise Exception("Expected exactly one update for this object, %d present" % int(newer.size)) |
| self.merge_update(newer[0]) |
| |
| |
| def merge_update(self, newObject): |
| self.impl.merge(new_object.impl) |
| |
| |
| def is_deleted(self): |
| return self.impl.isDeleted() |
| |
| |
| def key(self): pass |
| |
| |
| |
| class ObjectId: |
| def __init__(self, impl=None): |
| if impl: |
| self.impl = impl |
| else: |
| self.impl = qmfengine.ObjectId() |
| self.agent_key = "%d.%d" % (self.impl.getBrokerBank(), self.impl.getAgentBank()) |
| |
| |
| def object_num_high(self): |
| return self.impl.getObjectNumHi() |
| |
| |
| def object_num_low(self): |
| return self.impl.getObjectNumLo() |
| |
| |
| def agent_key(self): |
| self.agent_key |
| |
| def __eq__(self, other): |
| if not isinstance(other, self.__class__): return False |
| return self.impl == other.impl |
| |
| def __ne__(self, other): |
| return not self.__eq__(other) |
| |
| def __repr__(self): |
| return self.impl.str() |
| |
| |
| |
| class Arguments(object): |
| def __init__(self, map): |
| self.map = map |
| self._by_hash = {} |
| key_count = self.map.keyCount() |
| a = 0 |
| while a < key_count: |
| key = self.map.key(a) |
| self._by_hash[key] = qmf_to_native(self.map.byKey(key)) |
| a += 1 |
| |
| |
| def __getitem__(self, key): |
| return self._by_hash[key] |
| |
| |
| def __setitem__(self, key, value): |
| self._by_hash[key] = value |
| self.set(key, value) |
| |
| |
| def __iter__(self): |
| return self._by_hash.__iter__ |
| |
| |
| def __getattr__(self, name): |
| if name in self._by_hash: |
| return self._by_hash[name] |
| return super.__getattr__(self, name) |
| |
| |
| def __setattr__(self, name, value): |
| # |
| # ignore local data members |
| # |
| if (name[0] == '_' or |
| name == 'map'): |
| return super.__setattr__(self, name, value) |
| |
| if name in self._by_hash: |
| self._by_hash[name] = value |
| return self.set(name, value) |
| |
| return super.__setattr__(self, name, value) |
| |
| |
| def set(self, key, value): |
| val = self.map.byKey(key) |
| native_to_qmf(val, value) |
| |
| |
| |
| class MethodResponse(object): |
| def __init__(self, impl): |
| self.impl = qmfengine.MethodResponse(impl) |
| |
| |
| def status(self): |
| return self.impl.getStatus() |
| |
| |
| def exception(self): |
| return self.impl.getException() |
| |
| |
| def text(self): |
| return exception().asString() |
| |
| |
| def args(self): |
| return Arguments(self.impl.getArgs()) |
| |
| |
| def __getattr__(self, name): |
| myArgs = self.args() |
| return myArgs.__getattr__(name) |
| |
| |
| def __setattr__(self, name, value): |
| if name == 'impl': |
| return super.__setattr__(self, name, value) |
| |
| myArgs = self.args() |
| return myArgs.__setattr__(name, value) |
| |
| |
| |
| ##============================================================================== |
| ## QUERY |
| ##============================================================================== |
| |
| |
| class Query: |
| def __init__(self, kwargs={}): |
| if "impl" in kwargs: |
| self.impl = kwargs["impl"] |
| else: |
| package = '' |
| if "key" in kwargs: |
| # construct using SchemaClassKey: |
| self.impl = qmfengine.Query(kwargs["key"]) |
| elif "object_id" in kwargs: |
| self.impl = qmfengine.Query(kwargs["object_id"].impl) |
| else: |
| if "package" in kwargs: |
| package = kwargs["package"] |
| if "class" in kwargs: |
| self.impl = qmfengine.Query(kwargs["class"], package) |
| else: |
| raise Exception("Argument error: invalid arguments, use 'key', 'object_id' or 'class'[,'package']") |
| |
| |
| def package_name(self): return self.impl.getPackage() |
| def class_name(self): return self.impl.getClass() |
| def object_id(self): |
| _objid = self.impl.getObjectId() |
| if _objid: |
| return ObjectId(_objid) |
| else: |
| return None |
| |
| |
| ##============================================================================== |
| ## SCHEMA |
| ##============================================================================== |
| |
| |
| |
| class SchemaArgument: |
| #attr_reader :impl |
| def __init__(self, name, typecode, kwargs={}): |
| if "impl" in kwargs: |
| self.impl = kwargs["impl"] |
| else: |
| self.impl = qmfengine.SchemaArgument(name, typecode) |
| if kwargs.has_key("dir"): self.impl.setDirection(kwargs["dir"]) |
| if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) |
| if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) |
| |
| |
| def name(self): |
| return self.impl.getName() |
| |
| |
| def direction(self): |
| return self.impl.getDirection() |
| |
| |
| def typecode(self): |
| return self.impl.getType() |
| |
| |
| def __repr__(self): |
| return self.name() |
| |
| |
| |
| class SchemaMethod: |
| # attr_reader :impl, arguments |
| def __init__(self, name, kwargs={}): |
| self.arguments = [] |
| if "impl" in kwargs: |
| self.impl = kwargs["impl"] |
| for i in range(self.impl.getArgumentCount()): |
| self.arguments.append(SchemaArgument(None,None,{"impl":self.impl.getArgument(i)})) |
| else: |
| self.impl = qmfengine.SchemaMethod(name) |
| if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) |
| |
| |
| def add_argument(self, arg): |
| self.arguments.append(arg) |
| self.impl.addArgument(arg.impl) |
| |
| def name(self): |
| return self.impl.getName() |
| |
| def __repr__(self): |
| return self.name() |
| |
| |
| |
| class SchemaProperty: |
| #attr_reader :impl |
| def __init__(self, name, typecode, kwargs={}): |
| if "impl" in kwargs: |
| self.impl = kwargs["impl"] |
| else: |
| self.impl = qmfengine.SchemaProperty(name, typecode) |
| if kwargs.has_key("access"): self.impl.setAccess(kwargs["access"]) |
| if kwargs.has_key("index"): self.impl.setIndex(kwargs["index"]) |
| if kwargs.has_key("optional"): self.impl.setOptional(kwargs["optional"]) |
| if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) |
| if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) |
| |
| |
| def name(self): |
| return self.impl.getName() |
| |
| def __repr__(self): |
| return self.name() |
| |
| |
| |
| class SchemaStatistic: |
| # attr_reader :impl |
| def __init__(self, name, typecode, kwargs={}): |
| if "impl" in kwargs: |
| self.impl = kwargs["impl"] |
| else: |
| self.impl = qmfengine.SchemaStatistic(name, typecode) |
| if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"]) |
| if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) |
| |
| |
| def name(self): |
| return self.impl.getName() |
| |
| def __repr__(self): |
| return self.name() |
| |
| |
| |
| class SchemaClassKey: |
| #attr_reader :impl |
| def __init__(self, i): |
| self.impl = i |
| |
| |
| def package_name(self): |
| return self.impl.getPackageName() |
| |
| |
| def class_name(self): |
| return self.impl.getClassName() |
| |
| def __repr__(self): |
| return self.impl.asString() |
| |
| |
| |
| class SchemaObjectClass: |
| # attr_reader :impl, :properties, :statistics, :methods |
| def __init__(self, package, name, kwargs={}): |
| self.properties = [] |
| self.statistics = [] |
| self.methods = [] |
| if "impl" in kwargs: |
| self.impl = kwargs["impl"] |
| |
| for i in range(self.impl.getPropertyCount()): |
| self.properties.append(SchemaProperty(None, None, {"impl":self.impl.getProperty(i)})) |
| |
| for i in range(self.impl.getStatisticCount()): |
| self.statistics.append(SchemaStatistic(None, None, {"impl":self.impl.getStatistic(i)})) |
| |
| for i in range(self.impl.getMethodCount()): |
| self.methods.append(SchemaMethod(None, {"impl":self.impl.getMethod(i)})) |
| else: |
| self.impl = qmfengine.SchemaObjectClass(package, name) |
| |
| |
| def add_property(self, prop): |
| self.properties.append(prop) |
| self.impl.addProperty(prop.impl) |
| |
| |
| def add_statistic(self, stat): |
| self.statistics.append(stat) |
| self.impl.addStatistic(stat.impl) |
| |
| |
| def add_method(self, meth): |
| self.methods.append(meth) |
| self.impl.addMethod(meth.impl) |
| |
| |
| def class_key(self): |
| return SchemaClassKey(self.impl.getClassKey()) |
| |
| |
| def package_name(self): |
| return self.impl.getClassKey().getPackageName() |
| |
| |
| def class_name(self): |
| return self.impl.getClassKey().getClassName() |
| |
| |
| |
| class SchemaEventClass: |
| # attr_reader :impl :arguments |
| def __init__(self, package, name, sev, kwargs={}): |
| self.arguments = [] |
| if "impl" in kwargs: |
| self.impl = kwargs["impl"] |
| for i in range(self.impl.getArgumentCount()): |
| self.arguments.append(SchemaArgument(nil, nil, {"impl":self.impl.getArgument(i)})) |
| else: |
| self.impl = qmfengine.SchemaEventClass(package, name, sev) |
| if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"]) |
| |
| |
| def add_argument(self, arg): |
| self.arguments.append(arg) |
| self.impl.addArgument(arg.impl) |
| |
| |
| def name(self): |
| return self.impl.getClassKey().getClassName() |
| |
| def class_key(self): |
| return SchemaClassKey(self.impl.getClassKey()) |
| |
| |
| def package_name(self): |
| return self.impl.getClassKey().getPackageName() |
| |
| |
| def class_name(self): |
| return self.impl.getClassKey().getClassName() |
| |
| |
| ##============================================================================== |
| ## CONSOLE |
| ##============================================================================== |
| |
| |
| |
| class ConsoleHandler: |
| def agent_added(self, agent): pass |
| def agent_deleted(self, agent): pass |
| def new_package(self, package): pass |
| def new_class(self, class_key): pass |
| def object_update(self, obj, hasProps, hasStats): pass |
| def event_received(self, event): pass |
| def agent_heartbeat(self, agent, timestamp): pass |
| def method_response(self, resp): pass |
| def broker_info(self, broker): pass |
| |
| |
| |
| class Console(Thread): |
| # attr_reader :impl |
| def __init__(self, handler=None, kwargs={}): |
| Thread.__init__(self) |
| self._handler = handler |
| self.impl = qmfengine.Console() |
| self._event = qmfengine.ConsoleEvent() |
| self._broker_list = [] |
| self._cv = Condition() |
| self._sync_count = 0 |
| self._sync_result = None |
| self._select = {} |
| self._cb_cond = Condition() |
| self._operational = True |
| self.start() |
| |
| |
| def destroy(self, timeout=None): |
| logging.debug("Destroying Console...") |
| self._operational = False |
| self.start_console_events() # wake thread up |
| self.join(timeout) |
| logging.debug("... Console Destroyed!") |
| if self.isAlive(): |
| logging.error( "Console thread '%s' is hung..." % self.getName() ) |
| |
| |
| def add_connection(self, conn): |
| broker = Broker(self, conn) |
| self._cv.acquire() |
| try: |
| self._broker_list.append(broker) |
| finally: |
| self._cv.release() |
| return broker |
| |
| |
| def del_connection(self, broker): |
| logging.debug("shutting down broker...") |
| broker.shutdown() |
| logging.debug("...broker down.") |
| self._cv.acquire() |
| try: |
| self._broker_list.remove(broker) |
| finally: |
| self._cv.release() |
| logging.debug("del_connection() finished") |
| |
| |
| def packages(self): |
| plist = [] |
| for i in range(self.impl.packageCount()): |
| plist.append(self.impl.getPackageName(i)) |
| return plist |
| |
| |
| def classes(self, package, kind=CLASS_OBJECT): |
| clist = [] |
| for i in range(self.impl.classCount(package)): |
| key = self.impl.getClass(package, i) |
| class_kind = self.impl.getClassKind(key) |
| if class_kind == kind: |
| if kind == CLASS_OBJECT: |
| clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)})) |
| elif kind == CLASS_EVENT: |
| clist.append(SchemaEventClass(None, None, 0, {"impl":self.impl.getEventClass(key)})) |
| return clist |
| |
| |
| def bind_package(self, package): |
| return self.impl.bindPackage(package) |
| |
| |
| def bind_class(self, kwargs = {}): |
| if "key" in kwargs: |
| self.impl.bindClass(kwargs["key"]) |
| elif "package" in kwargs: |
| package = kwargs["package"] |
| if "class" in kwargs: |
| self.impl.bindClass(package, kwargs["class"]) |
| else: |
| self.impl.bindClass(package, "*") |
| else: |
| raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']") |
| |
| |
| def bind_event(self, kwargs = {}): |
| if "key" in kwargs: |
| self.impl.bindEvent(kwargs["key"]) |
| elif "package" in kwargs: |
| package = kwargs["package"] |
| if "event" in kwargs: |
| self.impl.bindEvent(package, kwargs["event"]) |
| else: |
| self.impl.bindEvent(package, "*") |
| else: |
| raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'event']") |
| |
| |
| def agents(self, broker=None): |
| blist = [] |
| if broker: |
| blist.append(broker) |
| else: |
| self._cv.acquire() |
| try: |
| # copy while holding lock |
| blist = self._broker_list[:] |
| finally: |
| self._cv.release() |
| |
| agents = [] |
| for b in blist: |
| for idx in range(b.impl.agentCount()): |
| agents.append(AgentProxy(b.impl.getAgent(idx), b)) |
| |
| return agents |
| |
| |
| def objects(self, query, kwargs = {}): |
| timeout = 30 |
| agent = None |
| temp_args = kwargs.copy() |
| if type(query) == type({}): |
| temp_args.update(query) |
| |
| if "_timeout" in temp_args: |
| timeout = temp_args["_timeout"] |
| temp_args.pop("_timeout") |
| |
| if "_agent" in temp_args: |
| agent = temp_args["_agent"] |
| temp_args.pop("_agent") |
| |
| if type(query) == type({}): |
| query = Query(temp_args) |
| |
| self._select = {} |
| for k in temp_args.iterkeys(): |
| if type(k) == str: |
| self._select[k] = temp_args[k] |
| |
| self._cv.acquire() |
| try: |
| self._sync_count = 1 |
| self._sync_result = [] |
| broker = self._broker_list[0] |
| broker.send_query(query.impl, None, agent) |
| self._cv.wait(timeout) |
| if self._sync_count == 1: |
| raise Exception("Timed out: waiting for query response") |
| finally: |
| self._cv.release() |
| |
| return self._sync_result |
| |
| |
| def object(self, query, kwargs = {}): |
| ''' |
| Return one and only one object or None. |
| ''' |
| objs = objects(query, kwargs) |
| if len(objs) == 1: |
| return objs[0] |
| else: |
| return None |
| |
| |
| def first_object(self, query, kwargs = {}): |
| ''' |
| Return the first of potentially many objects. |
| ''' |
| objs = objects(query, kwargs) |
| if objs: |
| return objs[0] |
| else: |
| return None |
| |
| |
| # Check the object against select to check for a match |
| def _select_match(self, object): |
| schema_props = object.properties() |
| for key in self._select.iterkeys(): |
| for prop in schema_props: |
| if key == p[0].name() and self._select[key] != p[1]: |
| return False |
| return True |
| |
| |
| def _get_result(self, list, context): |
| ''' |
| Called by Broker proxy to return the result of a query. |
| ''' |
| self._cv.acquire() |
| try: |
| for item in list: |
| if self._select_match(item): |
| self._sync_result.append(item) |
| self._sync_count -= 1 |
| self._cv.notify() |
| finally: |
| self._cv.release() |
| |
| |
| def start_sync(self, query): pass |
| |
| |
| def touch_sync(self, sync): pass |
| |
| |
| def end_sync(self, sync): pass |
| |
| |
| def run(self): |
| while self._operational: |
| self._cb_cond.acquire() |
| try: |
| self._cb_cond.wait(1) |
| while self._do_console_events(): |
| pass |
| finally: |
| self._cb_cond.release() |
| logging.debug("Shutting down Console thread") |
| |
| |
| def start_console_events(self): |
| self._cb_cond.acquire() |
| try: |
| self._cb_cond.notify() |
| finally: |
| self._cb_cond.release() |
| |
| |
| def _do_console_events(self): |
| ''' |
| Called by the Console thread to poll for events. Passes the events |
| onto the ConsoleHandler associated with this Console. Is called |
| periodically, but can also be kicked by Console.start_console_events(). |
| ''' |
| count = 0 |
| valid = self.impl.getEvent(self._event) |
| while valid: |
| count += 1 |
| try: |
| if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: |
| logging.debug("Console Event AGENT_ADDED received") |
| if self._handler: |
| self._handler.agent_added(AgentProxy(self._event.agent, None)) |
| elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: |
| logging.debug("Console Event AGENT_DELETED received") |
| if self._handler: |
| self._handler.agent_deleted(AgentProxy(self._event.agent, None)) |
| elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: |
| logging.debug("Console Event NEW_PACKAGE received") |
| if self._handler: |
| self._handler.new_package(self._event.name) |
| elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: |
| logging.debug("Console Event NEW_CLASS received") |
| if self._handler: |
| self._handler.new_class(SchemaClassKey(self._event.classKey)) |
| elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: |
| logging.debug("Console Event OBJECT_UPDATE received") |
| if self._handler: |
| self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), |
| self._event.hasProps, self._event.hasStats) |
| elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: |
| logging.debug("Console Event EVENT_RECEIVED received") |
| elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: |
| logging.debug("Console Event AGENT_HEARTBEAT received") |
| if self._handler: |
| self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) |
| elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: |
| logging.debug("Console Event METHOD_RESPONSE received") |
| else: |
| logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) |
| except e: |
| print "Exception caught in callback thread:", e |
| self.impl.popEvent() |
| valid = self.impl.getEvent(self._event) |
| return count |
| |
| |
| |
| class AgentProxy: |
| # attr_reader :broker |
| def __init__(self, impl, broker): |
| self.impl = impl |
| self.broker = broker |
| self.key = "%d.%d" % (self.impl.getBrokerBank(), self.impl.getAgentBank()) |
| |
| |
| def label(self): |
| return self.impl.getLabel() |
| |
| |
| def key(self): |
| return self.key |
| |
| |
| class Broker(ConnectionHandler): |
| # attr_reader :impl :conn, :console, :broker_bank |
| def __init__(self, console, conn): |
| self.broker_bank = 1 |
| self.console = console |
| self.conn = conn |
| self._session = None |
| self._cv = Condition() |
| self._stable = None |
| self._event = qmfengine.BrokerEvent() |
| self._xmtMessage = qmfengine.Message() |
| self.impl = qmfengine.BrokerProxy(self.console.impl) |
| self.console.impl.addConnection(self.impl, self) |
| self.conn.add_conn_handler(self) |
| self._operational = True |
| |
| |
| def shutdown(self): |
| logging.debug("broker.shutdown() called.") |
| self.console.impl.delConnection(self.impl) |
| self.conn.del_conn_handler(self) |
| if self._session: |
| self.impl.sessionClosed() |
| logging.debug("broker.shutdown() sessionClosed done.") |
| self._session.destroy() |
| logging.debug("broker.shutdown() session destroy done.") |
| self._session = None |
| self._operational = False |
| logging.debug("broker.shutdown() done.") |
| |
| |
| def wait_for_stable(self, timeout = None): |
| self._cv.acquire() |
| try: |
| if self._stable: |
| return |
| if timeout: |
| self._cv.wait(timeout) |
| if not self._stable: |
| raise Exception("Timed out: waiting for broker connection to become stable") |
| else: |
| while not self._stable: |
| self._cv.wait() |
| finally: |
| self._cv.release() |
| |
| |
| def send_query(self, query, ctx, agent): |
| agent_impl = None |
| if agent: |
| agent_impl = agent.impl |
| self.impl.sendQuery(query, ctx, agent_impl) |
| self.conn.kick() |
| |
| |
| def _do_broker_events(self): |
| count = 0 |
| valid = self.impl.getEvent(self._event) |
| while valid: |
| count += 1 |
| if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: |
| logging.debug("Broker Event BROKER_INFO received"); |
| elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: |
| logging.debug("Broker Event DECLARE_QUEUE received"); |
| self.conn.impl.declareQueue(self._session.handle, self._event.name) |
| elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: |
| logging.debug("Broker Event DELETE_QUEUE received"); |
| self.conn.impl.deleteQueue(self._session.handle, self._event.name) |
| elif self._event.kind == qmfengine.BrokerEvent.BIND: |
| logging.debug("Broker Event BIND received"); |
| self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) |
| elif self._event.kind == qmfengine.BrokerEvent.UNBIND: |
| logging.debug("Broker Event UNBIND received"); |
| self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) |
| elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: |
| logging.debug("Broker Event SETUP_COMPLETE received"); |
| self.impl.startProtocol() |
| elif self._event.kind == qmfengine.BrokerEvent.STABLE: |
| logging.debug("Broker Event STABLE received"); |
| self._cv.acquire() |
| try: |
| self._stable = True |
| self._cv.notify() |
| finally: |
| self._cv.release() |
| elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE: |
| result = [] |
| for idx in range(self._event.queryResponse.getObjectCount()): |
| result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self})) |
| self.console._get_result(result, self._event.context) |
| elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE: |
| obj = self._event.context |
| obj._method_result(MethodResponse(self._event.methodResponse())) |
| |
| self.impl.popEvent() |
| valid = self.impl.getEvent(self._event) |
| |
| return count |
| |
| |
| def _do_broker_messages(self): |
| count = 0 |
| valid = self.impl.getXmtMessage(self._xmtMessage) |
| while valid: |
| count += 1 |
| logging.debug("Broker: sending msg on connection") |
| self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) |
| self.impl.popXmt() |
| valid = self.impl.getXmtMessage(self._xmtMessage) |
| |
| return count |
| |
| |
| def _do_events(self): |
| while True: |
| self.console.start_console_events() |
| bcnt = self._do_broker_events() |
| mcnt = self._do_broker_messages() |
| if bcnt == 0 and mcnt == 0: |
| break; |
| |
| |
| def conn_event_connected(self): |
| logging.debug("Broker: Connection event CONNECTED") |
| self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) |
| self.impl.sessionOpened(self._session.handle) |
| self._do_events() |
| |
| |
| def conn_event_disconnected(self, error): |
| logging.debug("Broker: Connection event DISCONNECTED") |
| pass |
| |
| |
| def conn_event_visit(self): |
| self._do_events() |
| |
| |
| def sess_event_session_closed(self, context, error): |
| logging.debug("Broker: Session event CLOSED") |
| self.impl.sessionClosed() |
| |
| |
| def sess_event_recv(self, context, message): |
| logging.debug("Broker: Session event MSG_RECV") |
| if not self._operational: |
| logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) |
| self.impl.handleRcvMessage(message) |
| self._do_events() |
| |
| |
| |
| ##============================================================================== |
| ## AGENT |
| ##============================================================================== |
| |
| |
| |
| class AgentHandler: |
| def get_query(self, context, query, userId): None |
| def method_call(self, context, name, object_id, args, userId): None |
| |
| |
| |
| class Agent(ConnectionHandler): |
| def __init__(self, handler, label=""): |
| if label == "": |
| self._agentLabel = "rb-%s.%d" % (socket.gethostname(), os.getpid()) |
| else: |
| self._agentLabel = label |
| self._conn = None |
| self._handler = handler |
| self.impl = qmfengine.Agent(self._agentLabel) |
| self._event = qmfengine.AgentEvent() |
| self._xmtMessage = qmfengine.Message() |
| |
| |
| def set_connection(self, conn): |
| self._conn = conn |
| self._conn.add_conn_handler(self) |
| |
| |
| def register_class(self, cls): |
| self.impl.registerClass(cls.impl) |
| |
| |
| def alloc_object_id(self, low = 0, high = 0): |
| return ObjectId(self.impl.allocObjectId(low, high)) |
| |
| |
| def raise_event(self, event): |
| self.impl.raiseEvent(event.impl) |
| |
| def query_response(self, context, obj): |
| self.impl.queryResponse(context, obj.impl) |
| |
| |
| def query_complete(self, context): |
| self.impl.queryComplete(context) |
| |
| |
| def method_response(self, context, status, text, arguments): |
| self.impl.methodResponse(context, status, text, arguments.map) |
| |
| |
| def do_agent_events(self): |
| count = 0 |
| valid = self.impl.getEvent(self._event) |
| while valid: |
| count += 1 |
| if self._event.kind == qmfengine.AgentEvent.GET_QUERY: |
| self._handler.get_query(self._event.sequence, |
| Query({"impl":self._event.query}), |
| self._event.authUserId) |
| |
| elif self._event.kind == qmfengine.AgentEvent.START_SYNC: |
| pass |
| elif self._event.kind == qmfengine.AgentEvent.END_SYNC: |
| pass |
| elif self._event.kind == qmfengine.AgentEvent.METHOD_CALL: |
| args = Arguments(self._event.arguments) |
| self._handler.method_call(self._event.sequence, self._event.name, |
| ObjectId(self._event.objectId), |
| args, self._event.authUserId) |
| |
| elif self._event.kind == qmfengine.AgentEvent.DECLARE_QUEUE: |
| self._conn.impl.declareQueue(self._session.handle, self._event.name) |
| |
| elif self._event.kind == qmfengine.AgentEvent.DELETE_QUEUE: |
| self._conn.impl.deleteQueue(self._session.handle, self._event.name) |
| |
| elif self._event.kind == qmfengine.AgentEvent.BIND: |
| self._conn.impl.bind(self._session.handle, self._event.exchange, |
| self._event.name, self._event.bindingKey) |
| |
| elif self._event.kind == qmfengine.AgentEvent.UNBIND: |
| self._conn.impl.unbind(self._session.handle, self._event.exchange, |
| self._event.name, self._event.bindingKey) |
| |
| elif self._event.kind == qmfengine.AgentEvent.SETUP_COMPLETE: |
| self.impl.startProtocol() |
| |
| self.impl.popEvent() |
| valid = self.impl.getEvent(self._event) |
| return count |
| |
| |
| def do_agent_messages(self): |
| count = 0 |
| valid = self.impl.getXmtMessage(self._xmtMessage) |
| while valid: |
| count += 1 |
| self._conn.impl.sendMessage(self._session.handle, self._xmtMessage) |
| self.impl.popXmt() |
| valid = self.impl.getXmtMessage(self._xmtMessage) |
| return count |
| |
| |
| def do_events(self): |
| while True: |
| ecnt = self.do_agent_events() |
| mcnt = self.do_agent_messages() |
| if ecnt == 0 and mcnt == 0: |
| break |
| |
| |
| def conn_event_connected(self): |
| logging.debug("Agent Connection Established...") |
| self._session = Session(self._conn, |
| "qmfa-%s.%d" % (socket.gethostname(), os.getpid()), |
| self) |
| self.impl.newSession() |
| self.do_events() |
| |
| |
| def conn_event_disconnected(self, error): |
| logging.debug("Agent Connection Lost") |
| pass |
| |
| |
| def conn_event_visit(self): |
| self.do_events() |
| |
| |
| def sess_event_session_closed(self, context, error): |
| logging.debug("Agent Session Lost") |
| pass |
| |
| |
| def sess_event_recv(self, context, message): |
| self.impl.handleRcvMessage(message) |
| self.do_events() |
| |
| |