| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| ############################################################################### |
| ## This file is being obsoleted by qmf/console.py |
| ############################################################################### |
| |
| """ |
| Management API for Qpid |
| """ |
| |
| import qpid |
| import struct |
| import socket |
| from threading import Thread |
| from datatypes import Message, RangedSet |
| from time import time |
| from cStringIO import StringIO |
| from codec010 import StringCodec as Codec |
| from threading import Lock, Condition |
| |
| |
| class SequenceManager: |
| """ Manage sequence numbers for asynchronous method calls """ |
| def __init__ (self): |
| self.lock = Lock () |
| self.sequence = 0 |
| self.pending = {} |
| |
| def reserve (self, data): |
| """ Reserve a unique sequence number """ |
| self.lock.acquire () |
| result = self.sequence |
| self.sequence = self.sequence + 1 |
| self.pending[result] = data |
| self.lock.release () |
| return result |
| |
| def release (self, seq): |
| """ Release a reserved sequence number """ |
| data = None |
| self.lock.acquire () |
| if seq in self.pending: |
| data = self.pending[seq] |
| del self.pending[seq] |
| self.lock.release () |
| return data |
| |
| |
| class mgmtObject (object): |
| """ Generic object that holds the contents of a management object with its |
| attributes set as object attributes. """ |
| |
| def __init__ (self, classKey, timestamps, row): |
| self.classKey = classKey |
| self.timestamps = timestamps |
| for cell in row: |
| setattr (self, cell[0], cell[1]) |
| |
| class objectId(object): |
| """ Object that represents QMF object identifiers """ |
| |
| def __init__(self, codec, first=0, second=0): |
| if codec: |
| self.first = codec.read_uint64() |
| self.second = codec.read_uint64() |
| else: |
| self.first = first |
| self.second = second |
| |
| def __cmp__(self, other): |
| if other == None: |
| return 1 |
| if self.first < other.first: |
| return -1 |
| if self.first > other.first: |
| return 1 |
| if self.second < other.second: |
| return -1 |
| if self.second > other.second: |
| return 1 |
| return 0 |
| |
| |
| def index(self): |
| return (self.first, self.second) |
| |
| def getFlags(self): |
| return (self.first & 0xF000000000000000) >> 60 |
| |
| def getSequence(self): |
| return (self.first & 0x0FFF000000000000) >> 48 |
| |
| def getBroker(self): |
| return (self.first & 0x0000FFFFF0000000) >> 28 |
| |
| def getBank(self): |
| return self.first & 0x000000000FFFFFFF |
| |
| def getObject(self): |
| return self.second |
| |
| def isDurable(self): |
| return self.getSequence() == 0 |
| |
| def encode(self, codec): |
| codec.write_uint64(self.first) |
| codec.write_uint64(self.second) |
| |
| |
| class methodResult: |
| """ Object that contains the result of a method call """ |
| |
| def __init__ (self, status, sText, args): |
| self.status = status |
| self.statusText = sText |
| for arg in args: |
| setattr (self, arg, args[arg]) |
| |
| class brokerInfo: |
| """ Object that contains information about a broker and the session to it """ |
| |
| def __init__ (self, brokerId, sessionId): |
| self.brokerId = brokerId |
| self.sessionId = sessionId |
| |
| class managementChannel: |
| """ This class represents a connection to an AMQP broker. """ |
| |
| def __init__ (self, ssn, topicCb, replyCb, exceptionCb, cbContext, _detlife=0): |
| """ Given a channel on an established AMQP broker connection, this method |
| opens a session and performs all of the declarations and bindings needed |
| to participate in the management protocol. """ |
| self.enabled = True |
| self.ssn = ssn |
| self.sessionId = ssn.name |
| self.topicName = "mgmt-%s" % self.sessionId |
| self.replyName = "repl-%s" % self.sessionId |
| self.qpidChannel = ssn |
| self.tcb = topicCb |
| self.rcb = replyCb |
| self.ecb = exceptionCb |
| self.context = cbContext |
| self.reqsOutstanding = 0 |
| self.brokerInfo = None |
| |
| ssn.auto_sync = False |
| ssn.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True) |
| ssn.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True) |
| |
| ssn.exchange_bind (exchange="amq.direct", |
| queue=self.replyName, binding_key=self.replyName) |
| ssn.message_subscribe (queue=self.topicName, destination="tdest", |
| accept_mode=ssn.accept_mode.none, |
| acquire_mode=ssn.acquire_mode.pre_acquired) |
| ssn.message_subscribe (queue=self.replyName, destination="rdest", |
| accept_mode=ssn.accept_mode.none, |
| acquire_mode=ssn.acquire_mode.pre_acquired) |
| |
| ssn.incoming ("tdest").listen (self.topicCb, self.exceptionCb) |
| ssn.incoming ("rdest").listen (self.replyCb) |
| |
| ssn.message_set_flow_mode (destination="tdest", flow_mode=1) |
| ssn.message_flow (destination="tdest", unit=0, value=0xFFFFFFFFL) |
| ssn.message_flow (destination="tdest", unit=1, value=0xFFFFFFFFL) |
| |
| ssn.message_set_flow_mode (destination="rdest", flow_mode=1) |
| ssn.message_flow (destination="rdest", unit=0, value=0xFFFFFFFFL) |
| ssn.message_flow (destination="rdest", unit=1, value=0xFFFFFFFFL) |
| |
| def setBrokerInfo (self, data): |
| self.brokerInfo = data |
| |
| def shutdown (self): |
| self.enabled = False |
| self.ssn.incoming("tdest").stop() |
| self.ssn.incoming("rdest").stop() |
| |
| def topicCb (self, msg): |
| """ Receive messages via the topic queue on this channel. """ |
| if self.enabled: |
| self.tcb (self, msg) |
| self.ssn.receiver._completed.add(msg.id) |
| self.ssn.channel.session_completed(self.ssn.receiver._completed) |
| |
| def replyCb (self, msg): |
| """ Receive messages via the reply queue on this channel. """ |
| if self.enabled: |
| self.rcb (self, msg) |
| self.ssn.receiver._completed.add(msg.id) |
| self.ssn.channel.session_completed(self.ssn.receiver._completed) |
| |
| def exceptionCb (self, data): |
| if self.ecb != None: |
| self.ecb (self, data) |
| |
| def send (self, exchange, msg): |
| if self.enabled: |
| self.qpidChannel.message_transfer (destination=exchange, message=msg) |
| |
| def message (self, body, routing_key="broker"): |
| dp = self.qpidChannel.delivery_properties() |
| dp.routing_key = routing_key |
| mp = self.qpidChannel.message_properties() |
| mp.content_type = "application/octet-stream" |
| mp.reply_to = self.qpidChannel.reply_to("amq.direct", self.replyName) |
| return Message(dp, mp, body) |
| |
| |
| class managementClient: |
| """ This class provides an API for access to management data on the AMQP |
| network. It implements the management protocol and manages the management |
| schemas as advertised by the various management agents in the network. """ |
| |
| CTRL_BROKER_INFO = 1 |
| CTRL_SCHEMA_LOADED = 2 |
| CTRL_USER = 3 |
| CTRL_HEARTBEAT = 4 |
| |
| SYNC_TIME = 10.0 |
| |
| #======================================================== |
| # User API - interacts with the class's user |
| #======================================================== |
| def __init__ (self, unused=None, ctrlCb=None, configCb=None, instCb=None, methodCb=None, closeCb=None): |
| self.ctrlCb = ctrlCb |
| self.configCb = configCb |
| self.instCb = instCb |
| self.methodCb = methodCb |
| self.closeCb = closeCb |
| self.schemaCb = None |
| self.eventCb = None |
| self.channels = [] |
| self.seqMgr = SequenceManager () |
| self.schema = {} |
| self.packages = {} |
| self.cv = Condition () |
| self.syncInFlight = False |
| self.syncSequence = 0 |
| self.syncResult = None |
| |
| def schemaListener (self, schemaCb): |
| """ Optionally register a callback to receive details of the schema of |
| managed objects in the network. """ |
| self.schemaCb = schemaCb |
| |
| def eventListener (self, eventCb): |
| """ Optionally register a callback to receive events from managed objects |
| in the network. """ |
| self.eventCb = eventCb |
| |
| def addChannel (self, channel, cbContext=None): |
| """ Register a new channel. """ |
| mch = managementChannel (channel, self.topicCb, self.replyCb, self.exceptCb, cbContext) |
| |
| self.channels.append (mch) |
| self.incOutstanding (mch) |
| codec = Codec () |
| self.setHeader (codec, ord ('B')) |
| msg = mch.message(codec.encoded) |
| mch.send ("qpid.management", msg) |
| return mch |
| |
| def removeChannel (self, mch): |
| """ Remove a previously added channel from management. """ |
| mch.shutdown () |
| self.channels.remove (mch) |
| |
| def callMethod (self, channel, userSequence, objId, className, methodName, args=None): |
| """ Invoke a method on a managed object. """ |
| self.method (channel, userSequence, objId, className, methodName, args) |
| |
| def getObjects (self, channel, userSequence, className, bank=0): |
| """ Request immediate content from broker """ |
| codec = Codec () |
| self.setHeader (codec, ord ('G'), userSequence) |
| ft = {} |
| ft["_class"] = className |
| codec.write_map (ft) |
| msg = channel.message(codec.encoded, routing_key="agent.1.%d" % bank) |
| channel.send ("qpid.management", msg) |
| |
| def syncWaitForStable (self, channel): |
| """ Synchronous (blocking) call to wait for schema stability on a channel """ |
| self.cv.acquire () |
| if channel.reqsOutstanding == 0: |
| self.cv.release () |
| return channel.brokerInfo |
| |
| self.syncInFlight = True |
| starttime = time () |
| while channel.reqsOutstanding != 0: |
| self.cv.wait (self.SYNC_TIME) |
| if time () - starttime > self.SYNC_TIME: |
| self.cv.release () |
| raise RuntimeError ("Timed out waiting for response on channel") |
| self.cv.release () |
| return channel.brokerInfo |
| |
| def syncCallMethod (self, channel, objId, className, methodName, args=None): |
| """ Synchronous (blocking) method call """ |
| self.cv.acquire () |
| self.syncInFlight = True |
| self.syncResult = None |
| self.syncSequence = self.seqMgr.reserve ("sync") |
| self.cv.release () |
| self.callMethod (channel, self.syncSequence, objId, className, methodName, args) |
| self.cv.acquire () |
| starttime = time () |
| while self.syncInFlight: |
| self.cv.wait (self.SYNC_TIME) |
| if time () - starttime > self.SYNC_TIME: |
| self.cv.release () |
| raise RuntimeError ("Timed out waiting for response on channel") |
| result = self.syncResult |
| self.cv.release () |
| return result |
| |
| def syncGetObjects (self, channel, className, bank=0): |
| """ Synchronous (blocking) get call """ |
| self.cv.acquire () |
| self.syncInFlight = True |
| self.syncResult = [] |
| self.syncSequence = self.seqMgr.reserve ("sync") |
| self.cv.release () |
| self.getObjects (channel, self.syncSequence, className, bank) |
| self.cv.acquire () |
| starttime = time () |
| while self.syncInFlight: |
| self.cv.wait (self.SYNC_TIME) |
| if time () - starttime > self.SYNC_TIME: |
| self.cv.release () |
| raise RuntimeError ("Timed out waiting for response on channel") |
| result = self.syncResult |
| self.cv.release () |
| return result |
| |
| #======================================================== |
| # Channel API - interacts with registered channel objects |
| #======================================================== |
| def topicCb (self, ch, msg): |
| """ Receive messages via the topic queue of a particular channel. """ |
| codec = Codec (msg.body) |
| while True: |
| hdr = self.checkHeader (codec) |
| if hdr == None: |
| return |
| |
| if hdr[0] == 'p': |
| self.handlePackageInd (ch, codec) |
| elif hdr[0] == 'q': |
| self.handleClassInd (ch, codec) |
| elif hdr[0] == 'h': |
| self.handleHeartbeat (ch, codec) |
| elif hdr[0] == 'e': |
| self.handleEvent (ch, codec) |
| else: |
| self.parse (ch, codec, hdr[0], hdr[1]) |
| |
| def replyCb (self, ch, msg): |
| """ Receive messages via the reply queue of a particular channel. """ |
| codec = Codec (msg.body) |
| while True: |
| hdr = self.checkHeader (codec) |
| if hdr == None: |
| return |
| |
| if hdr[0] == 'm': |
| self.handleMethodReply (ch, codec, hdr[1]) |
| elif hdr[0] == 'z': |
| self.handleCommandComplete (ch, codec, hdr[1]) |
| elif hdr[0] == 'b': |
| self.handleBrokerResponse (ch, codec) |
| elif hdr[0] == 'p': |
| self.handlePackageInd (ch, codec) |
| elif hdr[0] == 'q': |
| self.handleClassInd (ch, codec) |
| else: |
| self.parse (ch, codec, hdr[0], hdr[1]) |
| |
| def exceptCb (self, ch, data): |
| if self.closeCb != None: |
| self.closeCb (ch.context, data) |
| |
| #======================================================== |
| # Internal Functions |
| #======================================================== |
| def setHeader (self, codec, opcode, seq = 0): |
| """ Compose the header of a management message. """ |
| codec.write_uint8 (ord ('A')) |
| codec.write_uint8 (ord ('M')) |
| codec.write_uint8 (ord ('2')) |
| codec.write_uint8 (opcode) |
| codec.write_uint32 (seq) |
| |
| def checkHeader (self, codec): |
| """ Check the header of a management message and extract the opcode and class. """ |
| try: |
| octet = chr (codec.read_uint8 ()) |
| if octet != 'A': |
| return None |
| octet = chr (codec.read_uint8 ()) |
| if octet != 'M': |
| return None |
| octet = chr (codec.read_uint8 ()) |
| if octet != '2': |
| return None |
| opcode = chr (codec.read_uint8 ()) |
| seq = codec.read_uint32 () |
| return (opcode, seq) |
| except: |
| return None |
| |
| def encodeValue (self, codec, value, typecode): |
| """ Encode, into the codec, a value based on its typecode. """ |
| if typecode == 1: |
| codec.write_uint8 (int (value)) |
| elif typecode == 2: |
| codec.write_uint16 (int (value)) |
| elif typecode == 3: |
| codec.write_uint32 (long (value)) |
| elif typecode == 4: |
| codec.write_uint64 (long (value)) |
| elif typecode == 5: |
| codec.write_uint8 (int (value)) |
| elif typecode == 6: |
| codec.write_str8 (value) |
| elif typecode == 7: |
| codec.write_str16 (value) |
| elif typecode == 8: # ABSTIME |
| codec.write_uint64 (long (value)) |
| elif typecode == 9: # DELTATIME |
| codec.write_uint64 (long (value)) |
| elif typecode == 10: # REF |
| value.encode(codec) |
| elif typecode == 11: # BOOL |
| codec.write_uint8 (int (value)) |
| elif typecode == 12: # FLOAT |
| codec.write_float (float (value)) |
| elif typecode == 13: # DOUBLE |
| codec.write_double (float (value)) |
| elif typecode == 14: # UUID |
| codec.write_uuid (value) |
| elif typecode == 15: # FTABLE |
| codec.write_map (value) |
| elif typecode == 16: |
| codec.write_int8 (int(value)) |
| elif typecode == 17: |
| codec.write_int16 (int(value)) |
| elif typecode == 18: |
| codec.write_int32 (int(value)) |
| elif typecode == 19: |
| codec.write_int64 (int(value)) |
| else: |
| raise ValueError ("Invalid type code: %d" % typecode) |
| |
| def decodeValue (self, codec, typecode): |
| """ Decode, from the codec, a value based on its typecode. """ |
| if typecode == 1: |
| data = codec.read_uint8 () |
| elif typecode == 2: |
| data = codec.read_uint16 () |
| elif typecode == 3: |
| data = codec.read_uint32 () |
| elif typecode == 4: |
| data = codec.read_uint64 () |
| elif typecode == 5: |
| data = codec.read_uint8 () |
| elif typecode == 6: |
| data = codec.read_str8 () |
| elif typecode == 7: |
| data = codec.read_str16 () |
| elif typecode == 8: # ABSTIME |
| data = codec.read_uint64 () |
| elif typecode == 9: # DELTATIME |
| data = codec.read_uint64 () |
| elif typecode == 10: # REF |
| data = objectId(codec) |
| elif typecode == 11: # BOOL |
| data = codec.read_uint8 () |
| elif typecode == 12: # FLOAT |
| data = codec.read_float () |
| elif typecode == 13: # DOUBLE |
| data = codec.read_double () |
| elif typecode == 14: # UUID |
| data = codec.read_uuid () |
| elif typecode == 15: # FTABLE |
| data = codec.read_map () |
| elif typecode == 16: |
| data = codec.read_int8 () |
| elif typecode == 17: |
| data = codec.read_int16 () |
| elif typecode == 18: |
| data = codec.read_int32 () |
| elif typecode == 19: |
| data = codec.read_int64 () |
| else: |
| raise ValueError ("Invalid type code: %d" % typecode) |
| return data |
| |
| def incOutstanding (self, ch): |
| self.cv.acquire () |
| ch.reqsOutstanding = ch.reqsOutstanding + 1 |
| self.cv.release () |
| |
| def decOutstanding (self, ch): |
| self.cv.acquire () |
| ch.reqsOutstanding = ch.reqsOutstanding - 1 |
| if ch.reqsOutstanding == 0 and self.syncInFlight: |
| self.syncInFlight = False |
| self.cv.notify () |
| self.cv.release () |
| |
| if ch.reqsOutstanding == 0: |
| if self.ctrlCb != None: |
| self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None) |
| ch.ssn.exchange_bind (exchange="qpid.management", |
| queue=ch.topicName, binding_key="console.#") |
| ch.ssn.exchange_bind (exchange="qpid.management", |
| queue=ch.topicName, binding_key="schema.#") |
| |
| |
| def handleMethodReply (self, ch, codec, sequence): |
| status = codec.read_uint32 () |
| sText = codec.read_str16 () |
| |
| data = self.seqMgr.release (sequence) |
| if data == None: |
| return |
| |
| (userSequence, classId, methodName) = data |
| args = {} |
| context = self.seqMgr.release (userSequence) |
| |
| if status == 0: |
| schemaClass = self.schema[classId] |
| ms = schemaClass['M'] |
| arglist = None |
| for mname in ms: |
| (mdesc, margs) = ms[mname] |
| if mname == methodName: |
| arglist = margs |
| if arglist == None: |
| return |
| |
| for arg in arglist: |
| if arg[2].find("O") != -1: |
| args[arg[0]] = self.decodeValue (codec, arg[1]) |
| |
| if context == "sync" and userSequence == self.syncSequence: |
| self.cv.acquire () |
| self.syncInFlight = False |
| self.syncResult = methodResult (status, sText, args) |
| self.cv.notify () |
| self.cv.release () |
| elif self.methodCb != None: |
| self.methodCb (ch.context, userSequence, status, sText, args) |
| |
| def handleCommandComplete (self, ch, codec, seq): |
| code = codec.read_uint32 () |
| text = codec.read_str8 () |
| data = (seq, code, text) |
| context = self.seqMgr.release (seq) |
| if context == "outstanding": |
| self.decOutstanding (ch) |
| elif context == "sync" and seq == self.syncSequence: |
| self.cv.acquire () |
| self.syncInFlight = False |
| self.cv.notify () |
| self.cv.release () |
| elif self.ctrlCb != None: |
| self.ctrlCb (ch.context, self.CTRL_USER, data) |
| |
| def handleBrokerResponse (self, ch, codec): |
| uuid = codec.read_uuid () |
| ch.brokerInfo = brokerInfo (uuid, ch.sessionId) |
| if self.ctrlCb != None: |
| self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, ch.brokerInfo) |
| |
| # Send a package request |
| sendCodec = Codec () |
| seq = self.seqMgr.reserve ("outstanding") |
| self.setHeader (sendCodec, ord ('P'), seq) |
| smsg = ch.message(sendCodec.encoded) |
| ch.send ("qpid.management", smsg) |
| |
| def handlePackageInd (self, ch, codec): |
| pname = codec.read_str8 () |
| if pname not in self.packages: |
| self.packages[pname] = {} |
| |
| # Send a class request |
| sendCodec = Codec () |
| seq = self.seqMgr.reserve ("outstanding") |
| self.setHeader (sendCodec, ord ('Q'), seq) |
| self.incOutstanding (ch) |
| sendCodec.write_str8 (pname) |
| smsg = ch.message(sendCodec.encoded) |
| ch.send ("qpid.management", smsg) |
| |
| def handleClassInd (self, ch, codec): |
| kind = codec.read_uint8() |
| if kind != 1: # This API doesn't handle new-style events |
| return |
| pname = codec.read_str8() |
| cname = codec.read_str8() |
| hash = codec.read_bin128() |
| if pname not in self.packages: |
| return |
| |
| if (cname, hash) not in self.packages[pname]: |
| # Send a schema request |
| sendCodec = Codec () |
| seq = self.seqMgr.reserve ("outstanding") |
| self.setHeader (sendCodec, ord ('S'), seq) |
| self.incOutstanding (ch) |
| sendCodec.write_str8 (pname) |
| sendCodec.write_str8 (cname) |
| sendCodec.write_bin128 (hash) |
| smsg = ch.message(sendCodec.encoded) |
| ch.send ("qpid.management", smsg) |
| |
| def handleHeartbeat (self, ch, codec): |
| timestamp = codec.read_uint64() |
| if self.ctrlCb != None: |
| self.ctrlCb (ch.context, self.CTRL_HEARTBEAT, timestamp) |
| |
| def handleEvent (self, ch, codec): |
| if self.eventCb == None: |
| return |
| timestamp = codec.read_uint64() |
| objId = objectId(codec) |
| packageName = codec.read_str8() |
| className = codec.read_str8() |
| hash = codec.read_bin128() |
| name = codec.read_str8() |
| classKey = (packageName, className, hash) |
| if classKey not in self.schema: |
| return; |
| schemaClass = self.schema[classKey] |
| row = [] |
| es = schemaClass['E'] |
| arglist = None |
| for ename in es: |
| (edesc, eargs) = es[ename] |
| if ename == name: |
| arglist = eargs |
| if arglist == None: |
| return |
| for arg in arglist: |
| row.append((arg[0], self.decodeValue(codec, arg[1]))) |
| self.eventCb(ch.context, classKey, objId, name, row) |
| |
| def parseSchema (self, ch, codec): |
| """ Parse a received schema-description message. """ |
| self.decOutstanding (ch) |
| kind = codec.read_uint8() |
| if kind != 1: # This API doesn't handle new-style events |
| return |
| packageName = codec.read_str8 () |
| className = codec.read_str8 () |
| hash = codec.read_bin128 () |
| hasSupertype = 0 #codec.read_uint8() |
| configCount = codec.read_uint16 () |
| instCount = codec.read_uint16 () |
| methodCount = codec.read_uint16 () |
| if hasSupertype != 0: |
| supertypePackage = codec.read_str8() |
| supertypeClass = codec.read_str8() |
| supertypeHash = codec.read_bin128() |
| |
| if packageName not in self.packages: |
| return |
| if (className, hash) in self.packages[packageName]: |
| return |
| |
| classKey = (packageName, className, hash) |
| if classKey in self.schema: |
| return |
| |
| configs = [] |
| insts = [] |
| methods = {} |
| |
| configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None)) |
| insts.append (("id", 4, None, None)) |
| |
| for idx in range (configCount): |
| ft = codec.read_map () |
| name = str (ft["name"]) |
| type = ft["type"] |
| access = ft["access"] |
| index = ft["index"] |
| optional = ft["optional"] |
| unit = None |
| min = None |
| max = None |
| maxlen = None |
| desc = None |
| |
| for key, value in ft.items (): |
| if key == "unit": |
| unit = str (value) |
| elif key == "min": |
| min = value |
| elif key == "max": |
| max = value |
| elif key == "maxlen": |
| maxlen = value |
| elif key == "desc": |
| desc = str (value) |
| |
| config = (name, type, unit, desc, access, index, min, max, maxlen, optional) |
| configs.append (config) |
| |
| for idx in range (instCount): |
| ft = codec.read_map () |
| name = str (ft["name"]) |
| type = ft["type"] |
| unit = None |
| desc = None |
| |
| for key, value in ft.items (): |
| if key == "unit": |
| unit = str (value) |
| elif key == "desc": |
| desc = str (value) |
| |
| inst = (name, type, unit, desc) |
| insts.append (inst) |
| |
| for idx in range (methodCount): |
| ft = codec.read_map () |
| mname = str (ft["name"]) |
| argCount = ft["argCount"] |
| if "desc" in ft: |
| mdesc = str (ft["desc"]) |
| else: |
| mdesc = None |
| |
| args = [] |
| for aidx in range (argCount): |
| ft = codec.read_map () |
| name = str (ft["name"]) |
| type = ft["type"] |
| dir = str (ft["dir"].upper ()) |
| unit = None |
| min = None |
| max = None |
| maxlen = None |
| desc = None |
| default = None |
| |
| for key, value in ft.items (): |
| if key == "unit": |
| unit = str (value) |
| elif key == "min": |
| min = value |
| elif key == "max": |
| max = value |
| elif key == "maxlen": |
| maxlen = value |
| elif key == "desc": |
| desc = str (value) |
| elif key == "default": |
| default = str (value) |
| |
| arg = (name, type, dir, unit, desc, min, max, maxlen, default) |
| args.append (arg) |
| methods[mname] = (mdesc, args) |
| |
| schemaClass = {} |
| schemaClass['C'] = configs |
| schemaClass['I'] = insts |
| schemaClass['M'] = methods |
| self.schema[classKey] = schemaClass |
| |
| if self.schemaCb != None: |
| self.schemaCb (ch.context, classKey, configs, insts, methods, {}) |
| |
| def parsePresenceMasks(self, codec, schemaClass): |
| """ Generate a list of not-present properties """ |
| excludeList = [] |
| bit = 0 |
| for element in schemaClass['C'][1:]: |
| if element[9] == 1: |
| if bit == 0: |
| mask = codec.read_uint8() |
| bit = 1 |
| if (mask & bit) == 0: |
| excludeList.append(element[0]) |
| bit = bit * 2 |
| if bit == 256: |
| bit = 0 |
| return excludeList |
| |
| def parseContent (self, ch, cls, codec, seq=0): |
| """ Parse a received content message. """ |
| if (cls == 'C' or (cls == 'B' and seq == 0)) and self.configCb == None: |
| return |
| if cls == 'I' and self.instCb == None: |
| return |
| |
| packageName = codec.read_str8 () |
| className = codec.read_str8 () |
| hash = codec.read_bin128 () |
| classKey = (packageName, className, hash) |
| |
| if classKey not in self.schema: |
| return |
| |
| row = [] |
| timestamps = [] |
| |
| timestamps.append (codec.read_uint64 ()) # Current Time |
| timestamps.append (codec.read_uint64 ()) # Create Time |
| timestamps.append (codec.read_uint64 ()) # Delete Time |
| objId = objectId(codec) |
| schemaClass = self.schema[classKey] |
| if cls == 'C' or cls == 'B': |
| notPresent = self.parsePresenceMasks(codec, schemaClass) |
| |
| if cls == 'C' or cls == 'B': |
| row.append(("id", objId)) |
| for element in schemaClass['C'][1:]: |
| tc = element[1] |
| name = element[0] |
| if name in notPresent: |
| row.append((name, None)) |
| else: |
| data = self.decodeValue(codec, tc) |
| row.append((name, data)) |
| |
| if cls == 'I' or cls == 'B': |
| if cls == 'I': |
| row.append(("id", objId)) |
| for element in schemaClass['I'][1:]: |
| tc = element[1] |
| name = element[0] |
| data = self.decodeValue (codec, tc) |
| row.append ((name, data)) |
| |
| if cls == 'C' or (cls == 'B' and seq != self.syncSequence): |
| self.configCb (ch.context, classKey, row, timestamps) |
| elif cls == 'B' and seq == self.syncSequence: |
| if timestamps[2] == 0: |
| obj = mgmtObject (classKey, timestamps, row) |
| self.syncResult.append (obj) |
| elif cls == 'I': |
| self.instCb (ch.context, classKey, row, timestamps) |
| |
| def parse (self, ch, codec, opcode, seq): |
| """ Parse a message received from the topic queue. """ |
| if opcode == 's': |
| self.parseSchema (ch, codec) |
| elif opcode == 'c': |
| self.parseContent (ch, 'C', codec) |
| elif opcode == 'i': |
| self.parseContent (ch, 'I', codec) |
| elif opcode == 'g': |
| self.parseContent (ch, 'B', codec, seq) |
| else: |
| raise ValueError ("Unknown opcode: %c" % opcode); |
| |
| def method (self, channel, userSequence, objId, classId, methodName, args): |
| """ Invoke a method on an object """ |
| codec = Codec () |
| sequence = self.seqMgr.reserve ((userSequence, classId, methodName)) |
| self.setHeader (codec, ord ('M'), sequence) |
| objId.encode(codec) |
| codec.write_str8 (classId[0]) |
| codec.write_str8 (classId[1]) |
| codec.write_bin128 (classId[2]) |
| codec.write_str8 (methodName) |
| bank = "%d.%d" % (objId.getBroker(), objId.getBank()) |
| |
| # Encode args according to schema |
| if classId not in self.schema: |
| self.seqMgr.release (sequence) |
| raise ValueError ("Unknown class name: %s" % classId) |
| |
| schemaClass = self.schema[classId] |
| ms = schemaClass['M'] |
| arglist = None |
| for mname in ms: |
| (mdesc, margs) = ms[mname] |
| if mname == methodName: |
| arglist = margs |
| if arglist == None: |
| self.seqMgr.release (sequence) |
| raise ValueError ("Unknown method name: %s" % methodName) |
| |
| for arg in arglist: |
| if arg[2].find("I") != -1: |
| value = arg[8] # default |
| if arg[0] in args: |
| value = args[arg[0]] |
| if value == None: |
| self.seqMgr.release (sequence) |
| raise ValueError ("Missing non-defaulted argument: %s" % arg[0]) |
| self.encodeValue (codec, value, arg[1]) |
| |
| packageName = classId[0] |
| className = classId[1] |
| msg = channel.message(codec.encoded, "agent." + bank) |
| channel.send ("qpid.management", msg) |