Move the QMFv2 implementation to its own directory.

git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmfv2@902858 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/python/qmf2/agent.py b/qpid/python/qmf2/agent.py
new file mode 100644
index 0000000..c6a518c
--- /dev/null
+++ b/qpid/python/qmf2/agent.py
@@ -0,0 +1,758 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import logging
+import datetime
+import time
+import Queue
+from threading import Thread, Lock, currentThread
+from qpid.messaging import Connection, Message, Empty, SendError
+from uuid import uuid4
+from common import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
+                       makeSubject, parseSubject, OpCode, QmfQuery,
+                       SchemaObjectClass, MsgKey, QmfData, QmfAddress,
+                       SchemaClass, SchemaClassId, WorkItem, SchemaMethod) 
+
+# global flag that indicates which thread (if any) is
+# running the agent notifier callback
+_callback_thread=None
+
+  ##==============================================================================
+  ## METHOD CALL
+  ##==============================================================================
+
+class _MethodCallHandle(object):
+    """
+    Private class used to hold context when handing off a method call to the
+    application.  Given to the app in a WorkItem, provided to the agent when
+    method_response() is invoked.
+    """
+    def __init__(self, correlation_id, reply_to, meth_name, _oid=None):
+        self.correlation_id = correlation_id
+        self.reply_to = reply_to
+        self.meth_name = meth_name
+        self.oid = _oid
+
+class MethodCallParams(object):
+    """
+    """
+    def __init__(self, name, _oid=None, _in_args=None, _user_id=None):
+        self._meth_name = name
+        self._oid = _oid
+        self._in_args = _in_args
+        self._user_id = _user_id
+
+    def get_name(self):
+        return self._meth_name
+
+    def get_object_id(self):
+        return self._oid
+
+    def get_args(self):
+        return self._in_args
+
+    def get_user_id(self):
+        return self._user_id
+
+
+
+  ##==============================================================================
+  ## AGENT
+  ##==============================================================================
+
+class Agent(Thread):
+    def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30, 
+                 _max_msg_size=0, _capacity=10):
+        Thread.__init__(self)
+        self._running = False
+
+        self.name = str(name)
+        self._domain = _domain
+        self._notifier = _notifier
+        self._heartbeat_interval = _heartbeat_interval
+        self._max_msg_size = _max_msg_size
+        self._capacity = _capacity
+
+        self._conn = None
+        self._session = None
+        self._direct_receiver = None
+        self._locate_receiver = None
+        self._ind_sender = None
+        self._event_sender = None
+
+        self._lock = Lock()
+        self._packages = {}
+        self._schema_timestamp = long(0)
+        self._schema = {}
+        self._agent_data = {}
+        self._work_q = Queue.Queue()
+        self._work_q_put = False
+
+
+    def destroy(self, timeout=None):
+        """
+        Must be called before the Agent is deleted.  
+        Frees up all resources and shuts down all background threads.
+
+        @type timeout: float
+        @param timeout: maximum time in seconds to wait for all background threads to terminate.  Default: forever.
+        """
+        logging.debug("Destroying Agent %s" % self.name)
+        if self._conn:
+            self.remove_connection(timeout)
+        logging.debug("Agent Destroyed")
+
+
+    def get_name(self):
+        return self.name
+
+    def set_connection(self, conn):
+        self._conn = conn
+        self._session = self._conn.session()
+
+        my_addr = QmfAddress.direct(self.name, self._domain)
+        self._direct_receiver = self._session.receiver(str(my_addr) +
+                                                       ";{create:always,"
+                                                       " node-properties:"
+                                                       " {type:topic,"
+                                                       " x-properties:"
+                                                       " {type:direct}}}",
+                                                       capacity=self._capacity)
+        logging.debug("my direct addr=%s" % self._direct_receiver.source)
+
+        locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
+        self._locate_receiver = self._session.receiver(str(locate_addr) +
+                                                       ";{create:always,"
+                                                       " node-properties:"
+                                                       " {type:topic}}",
+                                                       capacity=self._capacity)
+        logging.debug("agent.locate addr=%s" % self._locate_receiver.source)
+
+
+        ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
+        self._ind_sender = self._session.sender(str(ind_addr) +
+                                                ";{create:always,"
+                                                " node-properties:"
+                                                " {type:topic}}")
+        logging.debug("agent.ind addr=%s" % self._ind_sender.target)
+
+        my_events = QmfAddress.topic(self.name, self._domain)
+        self._event_sender = self._session.sender(str(my_events) +
+                                                  ";{create:always,"
+                                                  " node-properties:"
+                                                  " {type:topic}}")
+        logging.debug("my event addr=%s" % self._event_sender.target)
+
+        self._running = True
+        self.start()
+
+    def remove_connection(self, timeout=None):
+        # tell connection thread to shutdown
+        self._running = False
+        if self.isAlive():
+            # kick my thread to wake it up
+            my_addr = QmfAddress.direct(self.name, self._domain)
+            logging.debug("Making temp sender for [%s]" % str(my_addr))
+            tmp_sender = self._session.sender(str(my_addr))
+            try:
+                msg = Message(subject=makeSubject(OpCode.noop))
+                tmp_sender.send( msg, sync=True )
+            except SendError, e:
+                logging.error(str(e))
+            logging.debug("waiting for agent receiver thread to exit")
+            self.join(timeout)
+            if self.isAlive():
+                logging.error( "Agent thread '%s' is hung..." % self.name)
+        self._direct_receiver.close()
+        self._direct_receiver = None
+        self._locate_receiver.close()
+        self._locate_receiver = None
+        self._ind_sender.close()
+        self._ind_sender = None
+        self._event_sender.close()
+        self._event_sender = None
+        self._session.close()
+        self._session = None
+        self._conn = None
+        logging.debug("agent connection removal complete")
+
+    def register_object_class(self, schema):
+        """
+        Register an instance of a SchemaClass with this agent
+        """
+        # @todo: need to update subscriptions
+        # @todo: need to mark schema as "non-const"
+        if not isinstance(schema, SchemaClass):
+            raise TypeError("SchemaClass instance expected")
+
+        self._lock.acquire()
+        try:
+            classId = schema.get_class_id()
+            pname = classId.get_package_name()
+            cname = classId.get_class_name()
+            if pname not in self._packages:
+                self._packages[pname] = [cname]
+            else:
+                if cname not in self._packages[pname]:
+                    self._packages[pname].append(cname)
+            self._schema[classId] = schema
+            self._schema_timestamp = long(time.time() * 1000)
+        finally:
+            self._lock.release()
+
+    def register_event_class(self, schema):
+        return self.register_object_class(schema)
+
+    def raise_event(self, qmfEvent):
+        """
+        TBD
+        """
+        if not self._event_sender:
+            raise Exception("No connection available")
+
+        # @todo: should we validate against the schema?
+        _map = {"_name": self.get_name(),
+                "_event": qmfEvent.map_encode()}
+        msg = Message(subject=makeSubject(OpCode.event_ind),
+                      properties={"method":"response"},
+                      content={MsgKey.event:_map})
+        self._event_sender.send(msg)
+
+    def add_object(self, data ):
+        """
+        Register an instance of a QmfAgentData object.
+        """
+        # @todo: need to update subscriptions
+        # @todo: need to mark schema as "non-const"
+        if not isinstance(data, QmfAgentData):
+            raise TypeError("QmfAgentData instance expected")
+
+        id_ = data.get_object_id()
+        if not id_:
+            raise TypeError("No identifier assigned to QmfAgentData!")
+
+        self._lock.acquire()
+        try:
+            self._agent_data[id_] = data
+        finally:
+            self._lock.release()
+
+    def get_object(self, id):
+        self._lock.acquire()
+        try:
+            data = self._agent_data.get(id)
+        finally:
+            self._lock.release()
+        return data
+
+
+    def method_response(self, handle, _out_args=None, _error=None): 
+        """
+        """
+        if not isinstance(handle, _MethodCallHandle):
+            raise TypeError("Invalid handle passed to method_response!")
+
+        _map = {SchemaMethod.KEY_NAME:handle.meth_name}
+        if handle.oid is not None:
+            _map[QmfData.KEY_OBJECT_ID] = handle.oid
+        if _out_args is not None:
+            _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy()
+        if _error is not None:
+            if not isinstance(_error, QmfData):
+                raise TypeError("Invalid type for error - must be QmfData")
+            _map[SchemaMethod.KEY_ERROR] = _error.map_encode()
+
+        msg = Message(subject=makeSubject(OpCode.response),
+                      properties={"method":"response"},
+                      content={MsgKey.method:_map})
+        msg.correlation_id = handle.correlation_id
+
+        try:
+            tmp_snd = self._session.sender( handle.reply_to )
+            tmp_snd.send(msg)
+            logging.debug("method-response sent to [%s]" % handle.reply_to)
+        except SendError, e:
+            logging.error("Failed to send method response msg '%s' (%s)" % (msg, str(e)))
+
+    def get_workitem_count(self): 
+        """ 
+        Returns the count of pending WorkItems that can be retrieved.
+        """
+        return self._work_q.qsize()
+
+    def get_next_workitem(self, timeout=None): 
+        """
+        Obtains the next pending work item, or None if none available. 
+        """
+        try:
+            wi = self._work_q.get(True, timeout)
+        except Queue.Empty:
+            return None
+        return wi
+
+    def release_workitem(self, wi): 
+        """
+        Releases a WorkItem instance obtained by getNextWorkItem(). Called when 
+        the application has finished processing the WorkItem. 
+        """
+        pass
+
+
+    def run(self):
+        global _callback_thread
+        next_heartbeat = datetime.datetime.utcnow()
+        batch_limit = 10 # a guess
+        while self._running:
+
+            now = datetime.datetime.utcnow()
+            # print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
+            if  now >= next_heartbeat:
+                self._ind_sender.send(self._makeAgentIndMsg())
+                logging.debug("Agent Indication Sent")
+                next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
+
+            timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds 
+            # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
+            try:
+                self._session.next_receiver(timeout=timeout)
+            except Empty:
+                continue
+
+            for i in range(batch_limit):
+                try:
+                    msg = self._locate_receiver.fetch(timeout=0)
+                except Empty:
+                    break
+                if msg and msg.content_type == "amqp/map":
+                    self._dispatch(msg, _direct=False)
+
+            for i in range(batch_limit):
+                try:
+                    msg = self._direct_receiver.fetch(timeout=0)
+                except Empty:
+                    break
+                if msg and msg.content_type == "amqp/map":
+                    self._dispatch(msg, _direct=True)
+
+            if self._work_q_put and self._notifier:
+                # new stuff on work queue, kick the the application...
+                self._work_q_put = False
+                _callback_thread = currentThread()
+                logging.info("Calling agent notifier.indication")
+                self._notifier.indication()
+                _callback_thread = None
+
+    #
+    # Private:
+    #
+
+    def _makeAgentIndMsg(self):
+        """
+        Create an agent indication message identifying this agent
+        """
+        _map = {"_name": self.get_name(),
+                "_schema_timestamp": self._schema_timestamp}
+        return Message( subject=makeSubject(OpCode.agent_ind),
+                        properties={"method":"response"},
+                        content={MsgKey.agent_info: _map})
+
+
+    def _dispatch(self, msg, _direct=False):
+        """
+        Process a message from a console.
+
+        @param _direct: True if msg directly addressed to this agent.
+        """
+        logging.debug( "Message received from Console! [%s]" % msg )
+        try:
+            version,opcode = parseSubject(msg.subject)
+        except:
+            logging.debug("Ignoring unrecognized message '%s'" % msg.subject)
+            return
+
+        cmap = {}; props={}
+        if msg.content_type == "amqp/map":
+            cmap = msg.content
+        if msg.properties:
+            props = msg.properties
+
+        if opcode == OpCode.agent_locate:
+            self._handleAgentLocateMsg( msg, cmap, props, version, _direct )
+        elif opcode == OpCode.get_query:
+            self._handleQueryMsg( msg, cmap, props, version, _direct )
+        elif opcode == OpCode.method_req:
+            self._handleMethodReqMsg(msg, cmap, props, version, _direct)
+        elif opcode == OpCode.cancel_subscription:
+            logging.warning("!!! CANCEL_SUB TBD !!!")
+        elif opcode == OpCode.create_subscription:
+            logging.warning("!!! CREATE_SUB TBD !!!")
+        elif opcode == OpCode.renew_subscription:
+            logging.warning("!!! RENEW_SUB TBD !!!")
+        elif opcode == OpCode.schema_query:
+            logging.warning("!!! SCHEMA_QUERY TBD !!!")
+        elif opcode == OpCode.noop:
+            logging.debug("No-op msg received.")
+        else:
+            logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'"
+                            % opcode)
+
+    def _handleAgentLocateMsg( self, msg, cmap, props, version, direct ):
+        """
+        Process a received agent-locate message
+        """
+        logging.debug("_handleAgentLocateMsg")
+
+        reply = True
+        if "method" in props and props["method"] == "request":
+            query = cmap.get(MsgKey.query)
+            if query is not None:
+                # fake a QmfData containing my identifier for the query compare
+                tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()})
+                reply = QmfQuery(query).evaluate(tmpData)
+
+        if reply:
+            try:
+                tmp_snd = self._session.sender( msg.reply_to )
+                m = self._makeAgentIndMsg()
+                m.correlation_id = msg.correlation_id
+                tmp_snd.send(m)
+                logging.debug("agent-ind sent to [%s]" % msg.reply_to)
+            except SendError, e:
+                logging.error("Failed to send reply to agent-ind msg '%s' (%s)" % (msg, str(e)))
+        else:
+            logging.debug("agent-locate msg not mine - no reply sent")
+
+
+    def _handleQueryMsg(self, msg, cmap, props, version, _direct ):
+        """
+        Handle received query message
+        """
+        logging.debug("_handleQueryMsg")
+
+        if "method" in props and props["method"] == "request":
+            qmap = cmap.get(MsgKey.query)
+            if qmap:
+                query = QmfQuery.from_map(qmap)
+                target = query.get_target()
+                if target == QmfQuery.TARGET_PACKAGES:
+                    self._queryPackages( msg, query )
+                elif target == QmfQuery.TARGET_SCHEMA_ID:
+                    self._querySchema( msg, query, _idOnly=True )
+                elif target == QmfQuery.TARGET_SCHEMA:
+                    self._querySchema( msg, query)
+                elif target == QmfQuery.TARGET_AGENT:
+                    logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
+                elif target == QmfQuery.TARGET_OBJECT_ID:
+                    self._queryData(msg, query, _idOnly=True)
+                elif target == QmfQuery.TARGET_OBJECT:
+                    self._queryData(msg, query)
+                else:
+                    logging.warning("Unrecognized query target: '%s'" % str(target))
+
+
+
+    def _handleMethodReqMsg(self, msg, cmap, props, version, _direct):
+        """
+        Process received Method Request
+        """
+        if "method" in props and props["method"] == "request":
+            mname = cmap.get(SchemaMethod.KEY_NAME)
+            if not mname:
+                logging.warning("Invalid method call from '%s': no name"
+                                % msg.reply_to)
+                return
+
+            in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS)
+            oid = cmap.get(QmfData.KEY_OBJECT_ID)
+
+            print("!!! ci=%s rt=%s mn=%s oid=%s" % 
+                  (msg.correlation_id,
+                   msg.reply_to,
+                   mname,
+                   oid))
+
+            handle = _MethodCallHandle(msg.correlation_id,
+                                       msg.reply_to,
+                                       mname,
+                                       oid)
+            param = MethodCallParams( mname, oid, in_args, msg.user_id)
+            self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
+            self._work_q_put = True
+
+    def _queryPackages(self, msg, query):
+        """
+        Run a query against the list of known packages
+        """
+        pnames = []
+        self._lock.acquire()
+        try:
+            for name in self._packages.iterkeys():
+                if query.evaluate(QmfData.create({SchemaClassId.KEY_PACKAGE:name})):
+                    pnames.append(name)
+        finally:
+            self._lock.release()
+
+        try:
+            tmp_snd = self._session.sender( msg.reply_to )
+            m = Message( subject=makeSubject(OpCode.data_ind),
+                         properties={"method":"response"},
+                         content={MsgKey.package_info: pnames} )
+            if msg.correlation_id != None:
+                m.correlation_id = msg.correlation_id
+            tmp_snd.send(m)
+            logging.debug("package_info sent to [%s]" % msg.reply_to)
+        except SendError, e:
+            logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+
+
+    def _querySchema( self, msg, query, _idOnly=False ):
+        """
+        """
+        schemas = []
+        # if querying for a specific schema, do a direct lookup
+        if query.get_selector() == QmfQuery.ID:
+            found = None
+            self._lock.acquire()
+            try:
+                found = self._schema.get(query.get_id())
+            finally:
+                self._lock.release()
+            if found:
+                if _idOnly:
+                    schemas.append(query.get_id().map_encode())
+                else:
+                    schemas.append(found.map_encode())
+        else: # otherwise, evaluate all schema
+            self._lock.acquire()
+            try:
+                for sid,val in self._schema.iteritems():
+                    if query.evaluate(val):
+                        if _idOnly:
+                            schemas.append(sid.map_encode())
+                        else:
+                            schemas.append(val.map_encode())
+            finally:
+                self._lock.release()
+
+
+        tmp_snd = self._session.sender( msg.reply_to )
+
+        if _idOnly:
+            content = {MsgKey.schema_id: schemas}
+        else:
+            content = {MsgKey.schema:schemas}
+
+        m = Message( subject=makeSubject(OpCode.data_ind),
+                     properties={"method":"response"},
+                     content=content )
+        if msg.correlation_id != None:
+            m.correlation_id = msg.correlation_id
+        try:
+            tmp_snd.send(m)
+            logging.debug("schema_id sent to [%s]" % msg.reply_to)
+        except SendError, e:
+            logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+
+
+    def _queryData( self, msg, query, _idOnly=False ):
+        """
+        """
+        data_objs = []
+        # if querying for a specific object, do a direct lookup
+        if query.get_selector() == QmfQuery.ID:
+            found = None
+            self._lock.acquire()
+            try:
+                found = self._agent_data.get(query.get_id())
+            finally:
+                self._lock.release()
+            if found:
+                if _idOnly:
+                    data_objs.append(query.get_id())
+                else:
+                    data_objs.append(found.map_encode())
+        else: # otherwise, evaluate all data
+            self._lock.acquire()
+            try:
+                for oid,val in self._agent_data.iteritems():
+                    if query.evaluate(val):
+                        if _idOnly:
+                            data_objs.append(oid)
+                        else:
+                            data_objs.append(val.map_encode())
+            finally:
+                self._lock.release()
+
+        tmp_snd = self._session.sender( msg.reply_to )
+
+        if _idOnly:
+            content = {MsgKey.object_id:data_objs}
+        else:
+            content = {MsgKey.data_obj:data_objs}
+
+        m = Message( subject=makeSubject(OpCode.data_ind),
+                     properties={"method":"response"},
+                     content=content )
+        if msg.correlation_id != None:
+            m.correlation_id = msg.correlation_id
+        try:
+            tmp_snd.send(m)
+            logging.debug("data reply sent to [%s]" % msg.reply_to)
+        except SendError, e:
+            logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
+
+
+  ##==============================================================================
+  ## DATA MODEL
+  ##==============================================================================
+
+
+class QmfAgentData(QmfData):
+    """
+    A managed data object that is owned by an agent.
+    """
+
+    def __init__(self, agent, _values={}, _subtypes={}, _tag=None, _object_id=None,
+                 _schema=None):
+        # timestamp in millisec since epoch UTC
+        ctime = long(time.time() * 1000)
+        super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes,
+                                           _tag=_tag, _ctime=ctime,
+                                           _utime=ctime, _object_id=_object_id,
+                                           _schema=_schema, _const=False)
+        self._agent = agent
+
+    def destroy(self): 
+        self._dtime = long(time.time() * 1000)
+        # @todo: publish change
+
+    def is_deleted(self): 
+        return self._dtime == 0
+
+    def set_value(self, _name, _value, _subType=None):
+        super(QmfAgentData, self).set_value(_name, _value, _subType)
+        # @todo: publish change
+
+    def inc_value(self, name, delta=1):
+        """ add the delta to the property """
+        # @todo: need to take write-lock
+        val = self.get_value(name)
+        try:
+            val += delta
+        except:
+            raise
+        self.set_value(name, val)
+
+    def dec_value(self, name, delta=1): 
+        """ subtract the delta from the property """
+        # @todo: need to take write-lock
+        logging.error(" TBD!!!")
+
+
+################################################################################
+################################################################################
+################################################################################
+################################################################################
+
+if __name__ == '__main__':
+    # static test cases - no message passing, just exercise API
+    from common import (AgentName, SchemaProperty, qmfTypes,
+                           SchemaMethod, SchemaEventClass)
+
+    logging.getLogger().setLevel(logging.INFO)
+
+    logging.info( "Create an Agent" )
+    _agent_name = AgentName("redhat.com", "agent", "tross")
+    _agent = Agent(str(_agent_name))
+
+    logging.info( "Get agent name: '%s'" % _agent.get_name())
+
+    logging.info( "Create SchemaObjectClass" )
+
+    _schema = SchemaObjectClass(SchemaClassId("MyPackage", "MyClass"),
+                                _desc="A test data schema",
+                                _object_id_names=["index1", "index2"])
+    # add properties
+    _schema.add_property("index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+    _schema.add_property("index2", SchemaProperty(qmfTypes.TYPE_LSTR)) 
+
+    # these two properties are statistics
+    _schema.add_property("query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+    _schema.add_property("method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+    # These two properties can be set via the method call
+    _schema.add_property("set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+    _schema.add_property("set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+    # add method
+    _meth = SchemaMethod(_desc="Method to set string and int in object." )
+    _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+    _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+    _schema.add_method( "set_meth", _meth )
+
+    # Add schema to Agent
+
+    print("Schema Map='%s'" % str(_schema.map_encode()))
+
+    _agent.register_object_class(_schema)
+
+    # instantiate managed data objects matching the schema
+
+    logging.info( "Create QmfAgentData" )
+
+    _obj = QmfAgentData( _agent, _schema=_schema )
+    _obj.set_value("index1", 100)
+    _obj.set_value("index2", "a name" )
+    _obj.set_value("set_string", "UNSET")
+    _obj.set_value("set_int", 0)
+    _obj.set_value("query_count", 0)
+    _obj.set_value("method_call_count", 0)
+
+    print("Obj1 Map='%s'" % str(_obj.map_encode()))
+
+    _agent.add_object( _obj )
+
+    _obj = QmfAgentData( _agent, 
+                         _values={"index1":99, 
+                                  "index2": "another name",
+                                  "set_string": "UNSET",
+                                  "set_int": 0,
+                                  "query_count": 0,
+                                  "method_call_count": 0},
+                         _schema=_schema)
+
+    print("Obj2 Map='%s'" % str(_obj.map_encode()))
+
+    _agent.add_object(_obj)
+
+    ##############
+
+
+
+    logging.info( "Create SchemaEventClass" )
+
+    _event = SchemaEventClass(SchemaClassId("MyPackage", "MyEvent",
+                                            stype=SchemaClassId.TYPE_EVENT),
+                              _desc="A test data schema",
+                              _props={"edata_1": SchemaProperty(qmfTypes.TYPE_UINT32)})
+    _event.add_property("edata_2", SchemaProperty(qmfTypes.TYPE_LSTR)) 
+
+    print("Event Map='%s'" % str(_event.map_encode()))
+
+    _agent.register_event_class(_event)
diff --git a/qpid/python/qmf2/common.py b/qpid/python/qmf2/common.py
new file mode 100644
index 0000000..061c9fb
--- /dev/null
+++ b/qpid/python/qmf2/common.py
@@ -0,0 +1,1881 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import time
+import logging
+from threading import Lock
+from threading import Condition
+try:
+    import hashlib
+    _md5Obj = hashlib.md5
+except ImportError:
+    import md5
+    _md5Obj = md5.new
+
+
+
+
+##
+## Constants
+##
+
+
+AMQP_QMF_AGENT_LOCATE = "agent.locate"
+AMQP_QMF_AGENT_INDICATION = "agent.ind"
+AMQP_QMF_AGENT_EVENT="agent.event"
+# agent.ind[.<agent-name>]
+# agent.event.<sev>.<agent-name>
+# sev="strings"
+#
+
+AMQP_QMF_SUBJECT = "qmf"
+AMQP_QMF_VERSION = 4
+AMQP_QMF_SUBJECT_FMT = "%s%d.%s"
+
+class MsgKey(object):
+    agent_info = "agent_info"
+    query = "query"
+    package_info = "package_info"
+    schema_id = "schema_id"
+    schema = "schema"
+    object_id="object_id"
+    data_obj="object"
+    method="method"
+    event="event"
+
+
+class OpCode(object):
+    noop = "noop"
+
+    # codes sent by a console and processed by the agent
+    agent_locate = "agent-locate"
+    cancel_subscription = "cancel-subscription"
+    create_subscription = "create-subscription"
+    get_query = "get-query"
+    method_req = "method"
+    renew_subscription = "renew-subscription"
+    schema_query = "schema-query"  # @todo: deprecate
+
+    # codes sent by the agent to a console
+    agent_ind = "agent"
+    data_ind = "data"
+    event_ind = "event"
+    managed_object = "managed-object"
+    object_ind = "object"
+    response = "response"
+    schema_ind="schema"   # @todo: deprecate
+
+
+
+
+def makeSubject(_code): 
+    """
+    Create a message subject field value.
+    """
+    return AMQP_QMF_SUBJECT_FMT % (AMQP_QMF_SUBJECT, AMQP_QMF_VERSION, _code)
+
+
+def parseSubject(_sub):
+    """
+    Deconstruct a subject field, return version,opcode values
+    """
+    if _sub[:3] != "qmf":
+        raise Exception("Non-QMF message received")
+
+    return _sub[3:].split('.', 1)
+
+
+##==============================================================================
+## Async Event Model
+##==============================================================================
+
+
+class Notifier(object):
+    """
+    Virtual base class that defines a call back which alerts the application that
+    a QMF Console notification is pending.
+    """
+    def indication(self):
+        """
+        Called when one or more items are ready for the application to process.
+        This method may be called by an internal QMF library thread.  Its purpose is to
+        indicate that the application should process pending work items.
+        """
+        raise Exception("The indication method must be overridden by the application!")
+
+
+
+class WorkItem(object):
+    """
+    Describes an event that has arrived for the application to process.  The
+    Notifier is invoked when one or more of these WorkItems become available
+    for processing. 
+    """
+    # Enumeration of the types of WorkItems produced on the Console
+    AGENT_ADDED=1
+    AGENT_DELETED=2
+    NEW_PACKAGE=3
+    NEW_CLASS=4
+    OBJECT_UPDATE=5
+    EVENT_RECEIVED=7
+    AGENT_HEARTBEAT=8
+    # Enumeration of the types of WorkItems produced on the Agent
+    METHOD_CALL=1000
+    QUERY=1001
+    SUBSCRIBE=1002
+    UNSUBSCRIBE=1003
+
+    def __init__(self, kind, handle, _params=None):
+        """
+        Used by the Console to create a work item.
+        
+        @type kind: int
+        @param kind: work item type
+        """
+        self._kind = kind
+        self._handle = handle
+        self._params = _params
+
+    def get_type(self):
+        return self._kind
+
+    def get_handle(self):
+        return self._handle
+
+    def get_params(self):
+        return self._params
+
+
+
+##==============================================================================
+## Addressing
+##==============================================================================
+
+class QmfAddress(object):
+    """
+    TBD
+    """
+    TYPE_DIRECT = "direct"
+    TYPE_TOPIC = "topic"
+
+    ADDRESS_FMT = "qmf.%s.%s/%s"
+    DEFAULT_DOMAIN = "default"
+
+
+    def __init__(self, name, domain, type_):
+        self._name = name
+        self._domain = domain
+        self._type = type_
+
+    def _direct(cls, name, _domain=None):
+        if _domain is None:
+            _domain = QmfAddress.DEFAULT_DOMAIN
+        return cls(name, _domain, type_=QmfAddress.TYPE_DIRECT)
+    direct = classmethod(_direct)
+
+    def _topic(cls, name, _domain=None):
+        if _domain is None:
+            _domain = QmfAddress.DEFAULT_DOMAIN
+        return cls(name, _domain, type_=QmfAddress.TYPE_TOPIC)
+    topic = classmethod(_topic)
+
+
+    def get_address(self):
+        return str(self)
+
+    def __repr__(self):
+        return QmfAddress.ADDRESS_FMT % (self._domain, self._type, self._name)
+
+
+
+
+class AgentName(object):
+    """
+    Uniquely identifies a management agent within the management domain.
+    """
+    _separator = ":"
+
+    def __init__(self, vendor, product, name, _str=None):
+        """
+        Note: this object must be immutable, as it is used to index into a dictionary
+        """
+        if _str is not None:
+            # construct from string representation
+            if _str.count(AgentName._separator) < 2:
+                raise TypeError("AgentName string format must be 'vendor.product.name'")
+            self._vendor, self._product, self._name = _str.split(AgentName._separator)
+        else:
+            self._vendor = vendor
+            self._product = product
+            self._name = name
+
+
+    def _from_str(cls, str_):
+        return cls(None, None, None, str_=str_)
+    from_str = classmethod(_from_str)
+
+    def vendor(self):
+        return self._vendor
+
+    def product(self):
+        return self._product
+
+    def name(self):
+        return self._name
+
+    def __cmp__(self, other):
+        if not isinstance(other, AgentName) :
+            raise TypeError("Invalid types for compare")
+            # return 1
+        me = str(self)
+        them = str(other)
+
+        if me < them:
+            return -1
+        if me > them:
+            return 1
+        return 0
+
+    def __hash__(self):
+        return (self._vendor, self._product, self._name).__hash__()
+
+    def __repr__(self):
+        return self._vendor + AgentName._separator + \
+            self._product + AgentName._separator + \
+            self._name
+
+
+
+##==============================================================================
+## DATA MODEL
+##==============================================================================
+
+
+class _mapEncoder(object):
+    """ 
+    virtual base class for all objects that support being converted to a map
+    """
+
+    def map_encode(self):
+        raise Exception("The map_encode method my be overridden.")
+
+
+class QmfData(_mapEncoder):
+    """
+    Base data class representing arbitrarily structure data.  No schema or 
+    managing agent is associated with data of this class.
+
+    Map format:
+    map["_values"] = map of unordered "name"=<value> pairs (optional)
+    map["_subtype"] = map of unordered "name"="subtype string" pairs (optional)
+    map["_tag"] = application-specific tag for this instance (optional)
+    """
+    KEY_VALUES = "_values"
+    KEY_SUBTYPES = "_subtypes"
+    KEY_TAG="_tag"
+    KEY_OBJECT_ID = "_object_id"
+    KEY_SCHEMA_ID = "_schema_id"
+    KEY_UPDATE_TS = "_update_ts"
+    KEY_CREATE_TS = "_create_ts"
+    KEY_DELETE_TS = "_delete_ts"
+
+    def __init__(self,
+                 _values={}, _subtypes={}, _tag=None, _object_id=None,
+                 _ctime = 0, _utime = 0, _dtime = 0,
+                 _map=None,
+                 _schema=None, _const=False):
+        """
+        @type _values: dict
+        @param _values: dictionary of initial name=value pairs for object's
+        named data. 
+        @type _subtypes: dict
+        @param _subtype: dictionary of subtype strings for each of the object's
+        named data. 
+        @type _desc: string
+        @param _desc: Human-readable description of this data object.
+        @type _const: boolean
+        @param _const: if true, this object cannot be modified
+        """
+        self._schema_id = None
+        if _map is not None:
+            # construct from map
+            _tag = _map.get(self.KEY_TAG, _tag)
+            _values = _map.get(self.KEY_VALUES, _values)
+            _subtypes = _map.get(self.KEY_SUBTYPES, _subtypes)
+            _object_id = _map.get(self.KEY_OBJECT_ID, _object_id)
+            sid = _map.get(self.KEY_SCHEMA_ID)
+            if sid:
+                self._schema_id = SchemaClassId(_map=sid)
+            _ctime = long(_map.get(self.KEY_CREATE_TS, _ctime))
+            _utime = long(_map.get(self.KEY_UPDATE_TS, _utime))
+            _dtime = long(_map.get(self.KEY_DELETE_TS, _dtime))
+
+        self._values = _values.copy()
+        self._subtypes = _subtypes.copy()
+        self._tag = _tag
+        self._ctime = _ctime
+        self._utime = _utime
+        self._dtime = _dtime
+        self._const = _const
+
+        if _object_id is not None:
+            self._object_id = str(_object_id)
+        else:
+            self._object_id = None
+
+        if _schema is not None:
+            self._set_schema(_schema)
+        else:
+            # careful: map constructor may have already set self._schema_id, do
+            # not override it!
+            self._schema = None
+
+    def _create(cls, values, _subtypes={}, _tag=None, _object_id=None,
+                _schema=None, _const=False):
+        # timestamp in millisec since epoch UTC
+        ctime = long(time.time() * 1000)
+        return cls(_values=values, _subtypes=_subtypes, _tag=_tag,
+                   _ctime=ctime, _utime=ctime,
+                   _object_id=_object_id, _schema=_schema, _const=_const)
+    create = classmethod(_create)
+
+    def __from_map(cls, map_, _schema=None, _const=False):
+        return cls(_map=map_, _schema=_schema, _const=_const)
+    from_map = classmethod(__from_map)
+
+    def is_managed(self):
+        return self._object_id is not None
+
+    def is_described(self):
+        return self._schema_id is not None
+
+    def get_tag(self):
+        return self._tag
+
+    def get_value(self, name):
+        # meta-properties:
+        if name == SchemaClassId.KEY_PACKAGE:
+            if self._schema_id:
+                return self._schema_id.get_package_name()
+            return None
+        if name == SchemaClassId.KEY_CLASS:
+            if self._schema_id:
+                return self._schema_id.get_class_name()
+            return None
+        if name == SchemaClassId.KEY_TYPE:
+            if self._schema_id:
+                return self._schema_id.get_type()
+            return None
+        if name == SchemaClassId.KEY_HASH:
+            if self._schema_id:
+                return self._schema_id.get_hash_string()
+            return None
+        if name == self.KEY_SCHEMA_ID:
+            return self._schema_id
+        if name == self.KEY_OBJECT_ID:
+            return self._object_id
+        if name == self.KEY_TAG:
+            return self._tag
+        if name == self.KEY_UPDATE_TS:
+            return self._utime
+        if name == self.KEY_CREATE_TS:
+            return self._ctime
+        if name == self.KEY_DELETE_TS:
+            return self._dtime
+
+        return self._values.get(name)
+
+    def has_value(self, name):
+
+        if name in [SchemaClassId.KEY_PACKAGE, SchemaClassId.KEY_CLASS,
+                    SchemaClassId.KEY_TYPE, SchemaClassId.KEY_HASH,
+                    self.KEY_SCHEMA_ID]:
+            return self._schema_id is not None
+        if name in [self.KEY_UPDATE_TS, self.KEY_CREATE_TS,
+                    self.KEY_DELETE_TS]:
+            return True
+        if name == self.KEY_OBJECT_ID:
+            return self._object_id is not None
+        if name == self.KEY_TAG:
+            return self._tag is not None
+
+        return name in self._values
+
+    def set_value(self, _name, _value, _subType=None):
+        if self._const:
+            raise Exception("cannot modify constant data object")
+        self._values[_name] = _value
+        if _subType:
+            self._subtypes[_name] = _subType
+        return _value
+
+    def get_subtype(self, _name):
+        return self._subtypes.get(_name)
+
+    def get_schema_class_id(self): 
+        """
+        @rtype: class SchemaClassId
+        @returns: the identifier of the Schema that describes the structure of the data.
+        """
+        return self._schema_id
+
+    def get_object_id(self): 
+        """
+        Get the instance's identification string.
+        @rtype: str
+        @returns: the identification string, or None if not assigned and id. 
+        """
+        if self._object_id:
+            return self._object_id
+
+        # if object id not assigned, see if schema defines a set of field
+        # values to use as an id
+        if not self._schema: 
+            return None
+
+        ids = self._schema.get_id_names()
+        if not ids:
+            return None
+
+        if not self._validated:
+            self._validate()
+
+        result = u""
+        for key in ids:
+            try:
+                result += unicode(self._values[key])
+            except:
+                logging.error("get_object_id(): cannot convert value '%s'."
+                              % key)
+                return None
+        self._object_id = result
+        return result
+
+    def map_encode(self):
+        _map = {}
+        if self._tag:
+            _map[self.KEY_TAG] = self._tag
+
+        # data in the _values map may require recursive map_encode()
+        vmap = {}
+        for name,val in self._values.iteritems():
+            if isinstance(val, _mapEncoder):
+                vmap[name] = val.map_encode()
+            else:
+                # otherwise, just toss in the native type...
+                vmap[name] = val
+
+        _map[self.KEY_VALUES] = vmap
+        # subtypes are never complex, so safe to just copy
+        _map[self.KEY_SUBTYPES] = self._subtypes.copy()
+        if self._object_id:
+            _map[self.KEY_OBJECT_ID] = self._object_id
+        if self._schema_id:
+            _map[self.KEY_SCHEMA_ID] = self._schema_id.map_encode()
+        return _map
+
+    def _set_schema(self, schema):
+        self._validated = False
+        self._schema = schema
+        if schema:
+            self._schema_id = schema.get_class_id()
+            if self._const:
+                self._validate()
+        else:
+            self._schema_id = None
+
+    def _validate(self):
+        """
+        Compares this object's data against the associated schema.  Throws an 
+        exception if the data does not conform to the schema.
+        """
+        props = self._schema.get_properties()
+        for name,val in props.iteritems():
+            # @todo validate: type compatible with amqp_type?
+            # @todo validate: primary keys have values
+            if name not in self._values:
+                if val._isOptional:
+                    # ok not to be present, put in dummy value
+                    # to simplify access
+                    self._values[name] = None
+                else:
+                    raise Exception("Required property '%s' not present." % name)
+        self._validated = True
+
+    def __repr__(self):
+        return "QmfData=<<" + str(self.map_encode()) + ">>"
+        
+
+    def __setattr__(self, _name, _value):
+        # ignore private data members
+        if _name[0] == '_':
+            return super(QmfData, self).__setattr__(_name, _value)
+        if _name in self._values:
+            return self.set_value(_name, _value)
+        return super(QmfData, self).__setattr__(_name, _value)
+
+    def __getattr__(self, _name):
+        if _name != "_values" and _name in self._values: 
+            return self._values[_name]
+        raise AttributeError("no value named '%s' in this object" % _name)
+
+    def __getitem__(self, _name):
+        return self.__getattr__(_name)
+
+    def __setitem__(self, _name, _value):
+        return self.__setattr__(_name, _value)
+
+
+
+class QmfEvent(QmfData):
+    """
+    A QMF Event is a type of described data that is not managed. Events are 
+    notifications that are sent by Agents. An event notifies a Console of a 
+    change in some aspect of the system under managment.
+    """
+    KEY_TIMESTAMP = "_timestamp"
+    KEY_SEVERITY = "_severity"
+
+    SEV_EMERG = "emerg"
+    SEV_ALERT = "alert"
+    SEV_CRIT = "crit"
+    SEV_ERR = "err"
+    SEV_WARNING = "warning"
+    SEV_NOTICE = "notice"
+    SEV_INFO = "info"
+    SEV_DEBUG = "debug"
+
+    def __init__(self, _timestamp=None, _sev=SEV_NOTICE, _values={},
+                 _subtypes={}, _tag=None,
+                 _map=None,
+                 _schema=None, _const=True):
+        """
+        @type _map: dict
+        @param _map: if not None, construct instance from map representation. 
+        @type _timestamp: int
+        @param _timestamp: moment in time when event occurred, expressed
+               as milliseconds since Midnight, Jan 1, 1970 UTC.
+        @type _agentId: class AgentId
+        @param _agentId: Identifies agent issuing this event.
+        @type _schema: class Schema
+        @param _schema:
+        @type _schemaId: class SchemaClassId (event)
+        @param _schemaId: identi
+        """
+
+        if _map is not None:
+            # construct from map
+            super(QmfEvent, self).__init__(_map=_map, _schema=_schema,
+                                           _const=_const)
+            _timestamp = _map.get(self.KEY_TIMESTAMP, _timestamp)
+            _sev = _map.get(self.KEY_SEVERITY, _sev)
+        else:
+            super(QmfEvent, self).__init__(_values=_values,
+                                           _subtypes=_subtypes, _tag=_tag,
+                                           _schema=_schema, _const=_const)
+        if _timestamp is None:
+            raise TypeError("QmfEvent: a valid timestamp is required.")
+
+        try:
+            self._timestamp = long(_timestamp)
+        except:
+            raise TypeError("QmfEvent: a numeric timestamp is required.")
+
+        self._severity = _sev
+
+    def _create(cls, timestamp, severity, values,
+                _subtypes={}, _tag=None, _schema=None, _const=False):
+        return cls(_timestamp=timestamp, _sev=severity, _values=values,
+                _subtypes=_subtypes, _tag=_tag, _schema=_schema, _const=_const)
+    create = classmethod(_create)
+
+    def _from_map(cls, map_, _schema=None, _const=False):
+        return cls(_map=map_, _schema=_schema, _const=_const)
+    from_map = classmethod(_from_map)
+
+    def get_timestamp(self): 
+        return self._timestamp
+
+    def get_severity(self):
+        return self._severity
+
+    def map_encode(self):
+        _map = super(QmfEvent, self).map_encode()
+        _map[self.KEY_TIMESTAMP] = self._timestamp
+        _map[self.KEY_SEVERITY] = self._severity
+        return _map
+
+
+
+
+
+#==============================================================================
+#==============================================================================
+#==============================================================================
+
+
+
+
+class Arguments(object):
+    def __init__(self, map):
+        pass
+#         self.map = map
+#         self._by_hash = {}
+#         key_count = self.map.keyCount()
+#         a = 0
+#         while a < key_count:
+#             self._by_hash[self.map.key(a)] = self.by_key(self.map.key(a))
+#             a += 1
+    
+    
+#     def __getitem__(self, key):
+#         return self._by_hash[key]
+    
+    
+#     def __setitem__(self, key, value):
+#         self._by_hash[key] = value
+#         self.set(key, value)
+    
+    
+#     def __iter__(self):
+#         return self._by_hash.__iter__
+
+
+#     def __getattr__(self, name):
+#         if name in self._by_hash:
+#             return self._by_hash[name]
+#         return super.__getattr__(self, name)
+
+
+#     def __setattr__(self, name, value):
+#         #
+#         # ignore local data members
+#         #
+#         if (name[0] == '_' or
+#             name == 'map'):
+#             return super.__setattr__(self, name, value)
+
+#         if name in self._by_hash:
+#             self._by_hash[name] = value
+#             return self.set(name, value)
+
+#         return super.__setattr__(self, name, value)
+
+
+#     def by_key(self, key):
+#         val = self.map.byKey(key)
+#         vType = val.getType()
+#         if vType == TYPE_UINT8: return val.asUint()
+#         elif vType == TYPE_UINT16: return val.asUint()
+#         elif vType == TYPE_UINT32: return val.asUint()
+#         elif vType == TYPE_UINT64: return val.asUint64()
+#         elif vType == TYPE_SSTR: return val.asString()
+#         elif vType == TYPE_LSTR: return val.asString()
+#         elif vType == TYPE_ABSTIME:   return val.asInt64()
+#         elif vType == TYPE_DELTATIME: return val.asUint64()
+#         elif vType == TYPE_REF:  return ObjectId(val.asObjectId())
+#         elif vType == TYPE_BOOL: return val.asBool()
+#         elif vType == TYPE_FLOAT:  return val.asFloat()
+#         elif vType == TYPE_DOUBLE: return val.asDouble()
+#         elif vType == TYPE_UUID: return val.asUuid()
+#         elif vType == TYPE_INT8: return val.asInt()
+#         elif vType == TYPE_INT16: return val.asInt()
+#         elif vType == TYPE_INT32: return val.asInt()
+#         elif vType == TYPE_INT64: return val.asInt64()
+#         else:
+#             # when TYPE_MAP
+#             # when TYPE_OBJECT
+#             # when TYPE_LIST
+#             # when TYPE_ARRAY
+#             logging.error( "Unsupported Type for Get? '%s'" % str(val.getType()))
+#             return None
+    
+    
+#     def set(self, key, value):
+#         val = self.map.byKey(key)
+#         vType = val.getType()
+#         if vType == TYPE_UINT8: return val.setUint(value)
+#         elif vType == TYPE_UINT16: return val.setUint(value)
+#         elif vType == TYPE_UINT32: return val.setUint(value)
+#         elif vType == TYPE_UINT64: return val.setUint64(value)
+#         elif vType == TYPE_SSTR: 
+#             if value:
+#                 return val.setString(value)
+#             else:
+#                 return val.setString('')
+#         elif vType == TYPE_LSTR:
+#             if value:
+#                 return val.setString(value)
+#             else:
+#                 return val.setString('')
+#         elif vType == TYPE_ABSTIME: return val.setInt64(value)
+#         elif vType == TYPE_DELTATIME: return val.setUint64(value)
+#         elif vType == TYPE_REF: return val.setObjectId(value.impl)
+#         elif vType == TYPE_BOOL: return val.setBool(value)
+#         elif vType == TYPE_FLOAT: return val.setFloat(value)
+#         elif vType == TYPE_DOUBLE: return val.setDouble(value)
+#         elif vType == TYPE_UUID: return val.setUuid(value)
+#         elif vType == TYPE_INT8: return val.setInt(value)
+#         elif vType == TYPE_INT16: return val.setInt(value)
+#         elif vType == TYPE_INT32: return val.setInt(value)
+#         elif vType == TYPE_INT64: return val.setInt64(value)
+#         else:
+#             # when TYPE_MAP
+#             # when TYPE_OBJECT
+#             # when TYPE_LIST
+#             # when TYPE_ARRAY
+#             logging.error("Unsupported Type for Set? '%s'" % str(val.getType()))
+#             return None
+
+
+
+#class MethodResponse(object):
+#    def __init__(self, impl):
+#        pass
+#         self.impl = qmfengine.MethodResponse(impl)
+
+
+#     def status(self):
+#         return self.impl.getStatus()
+
+
+#     def exception(self):
+#         return self.impl.getException()
+
+
+#     def text(self):
+#         return exception().asString()
+
+
+#     def args(self):
+#         return Arguments(self.impl.getArgs())
+
+
+#     def __getattr__(self, name):
+#         myArgs = self.args()
+#         return myArgs.__getattr__(name)
+
+
+#     def __setattr__(self, name, value):
+#         if name == 'impl':
+#             return super.__setattr__(self, name, value)
+
+#         myArgs = self.args()
+#         return myArgs.__setattr__(name, value)
+
+
+
+#   ##==============================================================================
+#   ## QUERY
+#   ##==============================================================================
+
+
+
+# def _doQuery(predicate, params ):
+#     """
+#     Given the predicate from a query, and a map of named parameters, apply the predicate
+#     to the parameters, and return True or False.
+#     """
+#     if type(predicate) != list or len(predicate) < 1:
+#         return False
+
+#     elif opr == Query._LOGIC_AND:
+#         logging.debug("_doQuery() AND: [%s]" % predicate )
+#         rc = False
+#         for exp in predicate[1:]:
+#             rc = _doQuery( exp, params )
+#             if not rc:
+#                 break
+#         return rc
+
+#     elif opr == Query._LOGIC_OR:
+#         logging.debug("_doQuery() OR: [%s]" % predicate )
+#         rc = False
+#         for exp in predicate[1:]:
+#             rc = _doQuery( exp, params )
+#             if rc:
+#                 break
+#         return rc
+
+#     elif opr == Query._LOGIC_NOT:
+#         logging.debug("_doQuery() NOT: [%s]" % predicate )
+#         if len(predicate) != 2:
+#             logging.warning("Malformed query not-expression received: '%s'" % predicate)
+#             return False
+#         return not _doQuery( predicate[1:], params )
+
+
+
+#     else:
+#         logging.warning("Unknown query operator received: '%s'" % opr)
+#     return False
+
+
+
+class QmfQuery(_mapEncoder):
+
+    KEY_TARGET="what"
+    KEY_PREDICATE="where"
+    KEY_ID="id"
+
+    ### Query Types
+    ID=1
+    PREDICATE=2
+
+    #### Query Targets ####
+    TARGET_PACKAGES="schema_package"
+    # (returns just package names)
+    # allowed predicate key(s):
+    #
+    # SchemaClassId.KEY_PACKAGE
+
+    TARGET_SCHEMA_ID="schema_id"
+    TARGET_SCHEMA="schema"
+    # allowed predicate key(s):
+    #
+    # SchemaClassId.KEY_PACKAGE
+    # SchemaClassId.KEY_CLASS
+    # SchemaClassId.KEY_TYPE
+    # SchemaClassId.KEY_HASH
+    # SchemaClass.KEY_SCHEMA_ID
+    # name of property (exist test only)
+    # name of method (exist test only)
+
+    TARGET_AGENT="agent"
+    # allowed predicate keys(s):
+    #
+    KEY_AGENT_NAME="_name"
+
+    TARGET_OBJECT_ID="object_id"
+    TARGET_OBJECT="object"
+    # allowed predicate keys(s):
+    #
+    # SchemaClassId.KEY_PACKAGE
+    # SchemaClassId.KEY_CLASS
+    # SchemaClassId.KEY_TYPE
+    # SchemaClassId.KEY_HASH
+    # QmfData.KEY_SCHEMA_ID
+    # QmfData.KEY_OBJECT_ID
+    # QmfData.KEY_UPDATE_TS
+    # QmfData.KEY_CREATE_TS
+    # QmfData.KEY_DELETE_TS
+    # <name of data value>
+
+    CMP_EQ="eq"
+    CMP_NE="ne"
+    CMP_LT="lt"
+    CMP_LE="le"
+    CMP_GT="gt"
+    CMP_GE="ge"
+    CMP_RE_MATCH="re_match"
+    CMP_EXISTS="exists"
+    CMP_TRUE="true"
+    CMP_FALSE="false"
+
+    LOGIC_AND="and"
+    LOGIC_OR="or"
+    LOGIC_NOT="not"
+
+    _valid_targets = [TARGET_PACKAGES, TARGET_OBJECT_ID, TARGET_SCHEMA, TARGET_SCHEMA_ID, 
+                      TARGET_OBJECT, TARGET_AGENT]
+
+    def __init__(self, _target=None, _target_params=None, _predicate=None,
+                 _id=None, _map=None):
+        """
+        """
+        if _map is not None:
+            target_map = _map.get(self.KEY_TARGET)
+            if not target_map:
+                raise TypeError("QmfQuery requires a target map")
+
+            _target = None
+            for key in target_map.iterkeys():
+                if key in self._valid_targets:
+                    _target = key
+                    break
+
+            _target_params = target_map.get(_target)
+
+            _id = _map.get(self.KEY_ID)
+            if _id is not None:
+                # Convert identifier to native type if necessary
+                if _target == self.TARGET_SCHEMA:
+                    _id = SchemaClassId.from_map(_id)
+            else: 
+                pred = _map.get(self.KEY_PREDICATE)
+                if pred:
+                    _predicate = QmfQueryPredicate(pred)
+
+        self._target = _target
+        if not self._target:
+            raise TypeError("QmfQuery requires a target value")
+        self._target_params = _target_params
+        self._predicate = _predicate
+        self._id = _id
+
+    # constructors
+    def _create_wildcard(cls, target, _target_params=None):
+        return cls(_target=target, _target_params=_target_params)
+    create_wildcard = classmethod(_create_wildcard)
+
+    def _create_predicate(cls, target, predicate, _target_params=None): 
+        return cls(_target=target, _target_params=_target_params,
+                   _predicate=predicate)
+    create_predicate = classmethod(_create_predicate)
+
+    def _create_id(cls, target, ident, _target_params=None): 
+        return cls(_target=target, _target_params=_target_params, _id=ident)
+    create_id = classmethod(_create_id)
+
+    def _from_map(cls, map_):
+        return cls(_map=map_)
+    from_map = classmethod(_from_map)
+
+    def get_target(self):
+        return self._target
+
+    def get_target_param(self):
+        return self._target_params
+
+    def get_selector(self):
+        if self._id:
+            return QmfQuery.ID
+        else:
+            return QmfQuery.PREDICATE
+
+    def get_id(self):
+        return self._id
+
+    def get_predicate(self):
+        """
+        """
+        return self._predicate
+
+    def evaluate(self, qmfData):
+        """
+        """
+        if self._id:
+            if self._target == self.TARGET_SCHEMA:
+                return (qmfData.has_value(qmfData.KEY_SCHEMA_ID) and
+                        qmfData.get_value(qmfData.KEY_SCHEMA_ID) == self._id)
+            elif self._target == self.TARGET_OBJECT:
+                return (qmfData.has_value(qmfData.KEY_OBJECT_ID) and
+                        qmfData.get_value(qmfData.KEY_OBJECT_ID) == self._id)
+            elif self._target == self.TARGET_AGENT:
+                return (qmfData.has_value(self.KEY_AGENT_NAME) and
+                        qmfData.get_value(self.KEY_AGENT_NAME) == self._id)
+
+            raise Exception("Unsupported query target '%s'" % str(self._target))
+
+        if self._predicate:
+            return self._predicate.evaluate(qmfData)
+        # no predicate and no id - always match
+        return True
+
+    def map_encode(self):
+        _map = {self.KEY_TARGET: {self._target: self._target_params}}
+        if self._id is not None:
+            if isinstance(self._id, _mapEncoder):
+                _map[self.KEY_ID] = self._id.map_encode()
+            else:
+                _map[self.KEY_ID] = self._id
+        elif self._predicate is not None:
+            _map[self.KEY_PREDICATE] = self._predicate.map_encode()
+        return _map
+
+    def __repr__(self):
+        return "QmfQuery=<<" + str(self.map_encode()) + ">>"
+
+
+
+class QmfQueryPredicate(_mapEncoder):
+    """
+    Class for Query predicates.
+    """
+    _valid_cmp_ops = [QmfQuery.CMP_EQ, QmfQuery.CMP_NE, QmfQuery.CMP_LT, 
+                      QmfQuery.CMP_GT, QmfQuery.CMP_LE, QmfQuery.CMP_GE,
+                      QmfQuery.CMP_EXISTS, QmfQuery.CMP_RE_MATCH,
+                      QmfQuery.CMP_TRUE, QmfQuery.CMP_FALSE]
+    _valid_logic_ops = [QmfQuery.LOGIC_AND, QmfQuery.LOGIC_OR, QmfQuery.LOGIC_NOT]
+
+
+    def __init__( self, pmap):
+        """
+        {"op": listOf(operands)}
+        """
+        self._oper = None
+        self._operands = []
+
+        logic_op = False
+        if type(pmap) == dict:
+            for key in pmap.iterkeys():
+                if key in self._valid_cmp_ops:
+                    # comparison operation - may have "name" and "value"
+                    self._oper = key
+                    break
+                if key in self._valid_logic_ops:
+                    logic_op = True
+                    self._oper = key
+                    break
+
+            if not self._oper:
+                raise TypeError("invalid predicate expression: '%s'" % str(pmap))
+
+            if type(pmap[self._oper]) == list or type(pmap[self._oper]) == tuple:
+                if logic_op:
+                    for exp in pmap[self._oper]:
+                        self.append(QmfQueryPredicate(exp))
+                else:
+                    self._operands = list(pmap[self._oper])
+
+        else:
+            raise TypeError("invalid predicate: '%s'" % str(pmap))
+
+
+    def append(self, operand):
+        """
+        Append another operand to a predicate expression
+        """
+        self._operands.append(operand)
+
+
+
+    def evaluate( self, qmfData ):
+        """
+        """
+        if not isinstance(qmfData, QmfData):
+            raise TypeError("Query expects to evaluate QmfData types.")
+
+        if self._oper == QmfQuery.CMP_TRUE:
+            logging.debug("query evaluate TRUE")
+            return True
+        if self._oper == QmfQuery.CMP_FALSE:
+            logging.debug("query evaluate FALSE")
+            return False
+
+        if self._oper in [QmfQuery.CMP_EQ, QmfQuery.CMP_NE, QmfQuery.CMP_LT, 
+                          QmfQuery.CMP_LE, QmfQuery.CMP_GT, QmfQuery.CMP_GE,
+                          QmfQuery.CMP_RE_MATCH]:
+            if len(self._operands) != 2:
+                logging.warning("Malformed query compare expression received: '%s, %s'" %
+                                (self._oper, str(self._operands)))
+                return False
+            # @todo: support regular expression match
+            name = self._operands[0]
+            logging.debug("looking for: '%s'" % str(name))
+            if not qmfData.has_value(name):
+                logging.warning("Malformed query, attribute '%s' not present."
+                          % name)
+                return False
+
+            arg1 = qmfData.get_value(name)
+            arg2 = self._operands[1]
+            logging.debug("query evaluate %s: '%s' '%s' '%s'" % 
+                          (name, str(arg1), self._oper, str(arg2)))
+            try:
+                if self._oper == QmfQuery.CMP_EQ: return arg1 == arg2
+                if self._oper == QmfQuery.CMP_NE: return arg1 != arg2
+                if self._oper == QmfQuery.CMP_LT: return arg1 < arg2
+                if self._oper == QmfQuery.CMP_LE: return arg1 <= arg2
+                if self._oper == QmfQuery.CMP_GT: return arg1 > arg2
+                if self._oper == QmfQuery.CMP_GE: return arg1 >= arg2
+                if self._oper == QmfQuery.CMP_RE_MATCH: 
+                    logging.error("!!! RE QUERY TBD !!!")
+                    return False
+            except:
+                pass
+            logging.warning("Malformed query - %s: '%s' '%s' '%s'" % 
+                            (name, str(arg1), self._oper, str(self._operands[1])))
+            return False
+
+
+        if self._oper == QmfQuery.CMP_EXISTS:
+            if len(self._operands) != 1:
+                logging.warning("Malformed query present expression received")
+                return False
+            name = self._operands[0]
+            logging.debug("query evaluate PRESENT: [%s]" % str(name))
+            return qmfData.has_value(name)
+
+        if self._oper == QmfQuery.LOGIC_AND:
+            logging.debug("query evaluate AND: '%s'" % str(self._operands))
+            for exp in self._operands:
+                if not exp.evaluate(qmfData):
+                    return False
+            return True
+
+        if self._oper == QmfQuery.LOGIC_OR:
+            logging.debug("query evaluate OR: [%s]" % str(self._operands))
+            for exp in self._operands:
+                if exp.evaluate(qmfData):
+                    return True
+            return False
+
+        if self._oper == QmfQuery.LOGIC_NOT:
+            logging.debug("query evaluate NOT: [%s]" % str(self._operands))
+            for exp in self._operands:
+                if exp.evaluate(qmfData):
+                    return False
+            return True
+
+        logging.warning("Unrecognized query operator: [%s]" % str(self._oper))
+        return False
+
+    
+    def map_encode(self):
+        _map = {}
+        _list = []
+        for exp in self._operands:
+            if isinstance(exp, QmfQueryPredicate):
+                _list.append(exp.map_encode())
+            else:
+                _list.append(exp)
+        _map[self._oper] = _list
+        return _map
+
+
+    def __repr__(self):
+        return "QmfQueryPredicate=<<" + str(self.map_encode()) + ">>"
+    
+
+
+##==============================================================================
+## SCHEMA
+##==============================================================================
+
+
+# Argument typecodes, access, and direction qualifiers
+
+class qmfTypes(object):
+    TYPE_UINT8      = 1
+    TYPE_UINT16     = 2
+    TYPE_UINT32     = 3
+    TYPE_UINT64     = 4
+
+    TYPE_SSTR       = 6
+    TYPE_LSTR       = 7
+
+    TYPE_ABSTIME    = 8
+    TYPE_DELTATIME  = 9
+
+    TYPE_REF        = 10
+
+    TYPE_BOOL       = 11
+
+    TYPE_FLOAT      = 12
+    TYPE_DOUBLE     = 13
+
+    TYPE_UUID       = 14
+
+    TYPE_MAP        = 15
+
+    TYPE_INT8       = 16
+    TYPE_INT16      = 17
+    TYPE_INT32      = 18
+    TYPE_INT64      = 19
+
+    TYPE_OBJECT     = 20
+
+    TYPE_LIST       = 21
+
+    TYPE_ARRAY      = 22
+
+# New subtypes:
+# integer (for time, duration, signed/unsigned)
+# double (float)
+# bool
+# string
+# map (ref, qmfdata)
+# list
+# uuid
+
+
+class qmfAccess(object):
+    READ_CREATE = 1 
+    READ_WRITE = 2
+    READ_ONLY = 3
+
+
+class qmfDirection(object):
+    DIR_IN = 1
+    DIR_OUT = 2
+    DIR_IN_OUT = 3
+
+
+
+def _toBool( param ):
+    """
+    Helper routine to convert human-readable representations of
+    boolean values to python bool types.
+    """
+    _false_strings = ["off", "no", "false", "0", "none"]
+    _true_strings =  ["on",  "yes", "true", "1"]
+    if type(param) == str:
+        lparam = param.lower()
+        if lparam in _false_strings:
+            return False
+        if lparam in _true_strings:
+            return True
+        raise TypeError("unrecognized boolean string: '%s'" % param )
+    else:
+        return bool(param)
+
+
+
+class SchemaClassId(_mapEncoder):
+    """ 
+    Unique identifier for an instance of a SchemaClass.
+
+    Map format:
+    map["package_name"] = str, name of associated package
+    map["class_name"] = str, name of associated class
+    map["type"] = str, "data"|"event", default: "data"
+    optional:
+    map["hash_str"] = str, hash value in standard format or None 
+    if hash is unknown.
+    """
+    KEY_PACKAGE="_package_name"
+    KEY_CLASS="_class_name"
+    KEY_TYPE="_type"
+    KEY_HASH="_hash_str"
+
+    TYPE_DATA = "_data"
+    TYPE_EVENT = "_event"
+
+    _valid_types=[TYPE_DATA, TYPE_EVENT]
+    _schemaHashStrFormat = "%08x-%08x-%08x-%08x"
+    _schemaHashStrDefault = "00000000-00000000-00000000-00000000"
+
+    def __init__(self, pname=None, cname=None, stype=TYPE_DATA, hstr=None,
+                 _map=None): 
+        """
+        @type pname: str
+        @param pname: the name of the class's package
+        @type cname: str
+        @param cname: name of the class
+        @type stype: str
+        @param stype: schema type [data | event]
+        @type hstr: str
+        @param hstr: the hash value in '%08x-%08x-%08x-%08x' format
+        """
+        if _map is not None:
+            # construct from map
+            pname = _map.get(self.KEY_PACKAGE, pname)
+            cname = _map.get(self.KEY_CLASS, cname)
+            stype = _map.get(self.KEY_TYPE, stype)
+            hstr = _map.get(self.KEY_HASH, hstr)
+
+        self._pname = pname
+        self._cname = cname
+        if stype not in SchemaClassId._valid_types:
+            raise TypeError("Invalid SchemaClassId type: '%s'" % stype)
+        self._type = stype
+        self._hstr = hstr
+        if self._hstr:
+            try:
+                # sanity check the format of the hash string
+                hexValues = hstr.split("-")
+                h0 = int(hexValues[0], 16)
+                h1 = int(hexValues[1], 16)
+                h2 = int(hexValues[2], 16)
+                h3 = int(hexValues[3], 16)
+            except:
+                raise Exception("Invalid SchemaClassId format: bad hash string: '%s':"
+                                % hstr)
+    # constructor
+    def _create(cls, pname, cname, stype=TYPE_DATA, hstr=None):
+        return cls(pname=pname, cname=cname, stype=stype, hstr=hstr)
+    create = classmethod(_create)
+
+    # map constructor 
+    def _from_map(cls, map_):
+        return cls(_map=map_)
+    from_map = classmethod(_from_map)
+
+    def get_package_name(self): 
+        """ 
+        Access the package name in the SchemaClassId.
+
+        @rtype: str
+        """
+        return self._pname
+
+
+    def get_class_name(self): 
+        """ 
+        Access the class name in the SchemaClassId
+
+        @rtype: str
+        """
+        return self._cname
+
+
+    def get_hash_string(self): 
+        """ 
+        Access the schema's hash as a string value
+
+        @rtype: str
+        """
+        return self._hstr
+
+
+    def get_type(self):
+        """ 
+        Returns the type code associated with this Schema
+
+        @rtype: str
+        """
+        return self._type
+
+    def map_encode(self):
+        _map = {}
+        _map[self.KEY_PACKAGE] = self._pname
+        _map[self.KEY_CLASS] = self._cname
+        _map[self.KEY_TYPE] = self._type
+        if self._hstr: _map[self.KEY_HASH] = self._hstr
+        return _map
+
+    def __repr__(self):
+        hstr = self.get_hash_string()
+        if not hstr:
+            hstr = SchemaClassId._schemaHashStrDefault
+        return self._pname + ":" + self._cname + ":" + self._type +  "(" + hstr  + ")"
+
+
+    def __cmp__(self, other):
+        if isinstance(other, dict):
+            other = SchemaClassId.from_map(other)
+        if not isinstance(other, SchemaClassId):
+            raise TypeError("Invalid types for compare")
+            # return 1
+        me = str(self)
+        them = str(other)
+        if me < them:
+            return -1
+        if me > them:
+            return 1
+        return 0
+
+
+    def __hash__(self):
+        return (self._pname, self._cname, self._hstr).__hash__()
+
+
+
+class SchemaProperty(_mapEncoder):
+    """
+    Describes the structure of a Property data object.
+    Map format:
+    map["amqp_type"] = int, AMQP type code indicating property's data type
+    
+    optional:
+    map["access"] = str, access allowed to this property, default "RO"
+    map["index"] = bool, True if this property is an index value, default False
+    map["optional"] = bool, True if this property is optional, default False
+    map["unit"] = str, describes units used
+    map["min"] = int, minimum allowed value
+    map["max"] = int, maximun allowed value
+    map["maxlen"] = int, if string type, this is the maximum length in bytes 
+    required to represent the longest instance of this string.
+    map["desc"] = str, human-readable description of this argument
+    map["reference"] = str, ???
+    map["parent_ref"] = bool, true if this property references an object  in
+    which this object is in a child-parent relationship. Default False
+    """
+    __hash__ = None
+    _access_strings = ["RO","RW","RC"]
+    _dir_strings = ["I", "O", "IO"]
+    def __init__(self, _type_code=None, _map=None, kwargs={}):
+        if _map is not None:
+            # construct from map
+            _type_code = _map.get("amqp_type", _type_code)
+            kwargs = _map
+            if not _type_code:
+                raise TypeError("SchemaProperty: amqp_type is a mandatory"
+                                " parameter")
+
+        self._type = _type_code
+        self._access  = "RO"
+        self._isIndex   = False
+        self._isOptional = False
+        self._unit    = None
+        self._min     = None
+        self._max     = None
+        self._maxlen  = None
+        self._desc    = None
+        self._reference = None
+        self._isParentRef  = False
+        self._dir = None
+        self._default = None
+
+        for key, value in kwargs.items():
+            if key == "access":
+                value = str(value).upper()
+                if value not in self._access_strings:
+                    raise TypeError("invalid value for access parameter: '%s':" % value )
+                self._access = value
+            elif key == "index"   : self._isIndex = _toBool(value)
+            elif key == "optional": self._isOptional = _toBool(value)
+            elif key == "unit"    : self._unit    = value
+            elif key == "min"     : self._min     = value
+            elif key == "max"     : self._max     = value
+            elif key == "maxlen"  : self._maxlen  = value
+            elif key == "desc"    : self._desc    = value
+            elif key == "reference" : self._reference = value
+            elif key == "parent_ref"   : self._isParentRef = _toBool(value)
+            elif key == "dir":
+                value = str(value).upper()
+                if value not in self._dir_strings:
+                    raise TypeError("invalid value for direction parameter: '%s'" % value)
+                self._dir = value
+            elif key == "default" : self._default = value
+
+    # constructor
+    def _create(cls, type_code, kwargs={}):
+        return cls(_type_code=type_code, kwargs=kwargs)
+    create = classmethod(_create)
+
+    # map constructor
+    def _from_map(cls, map_):
+        return cls(_map=map_)
+    from_map = classmethod(_from_map)
+
+    def getType(self): return self._type
+
+    def getAccess(self): return self._access
+
+    def is_optional(self): return self._isOptional
+
+    def isIndex(self): return self._isIndex
+
+    def getUnit(self): return self._unit
+
+    def getMin(self): return self._min
+
+    def getMax(self): return self._max
+
+    def getMaxLen(self): return self._maxlen
+
+    def getDesc(self): return self._desc
+
+    def getReference(self): return self._reference
+
+    def isParentRef(self): return self._isParentRef
+
+    def get_direction(self): return self._dir
+
+    def get_default(self): return self._default
+
+    def map_encode(self):
+        """
+        Return the map encoding of this schema.
+        """
+        _map = {}
+        _map["amqp_type"] = self._type
+        _map["access"] = self._access
+        _map["index"] = self._isIndex
+        _map["optional"] = self._isOptional
+        if self._unit: _map["unit"] = self._unit
+        if self._min:  _map["min"] = self._min
+        if self._max:  _map["max"] = self._max
+        if self._maxlen: _map["maxlen"] = self._maxlen
+        if self._desc: _map["desc"] = self._desc
+        if self._reference: _map["reference"] = self._reference
+        _map["parent_ref"] = self._isParentRef
+        if self._dir: _map["dir"] = self._dir
+        if self._default: _map["default"] = self._default
+        return _map
+
+    def __repr__(self): 
+        return "SchemaProperty=<<" + str(self.map_encode()) + ">>"
+
+    def _updateHash(self, hasher):
+        """
+        Update the given hash object with a hash computed over this schema.
+        """
+        hasher.update(str(self._type))
+        hasher.update(str(self._isIndex))
+        hasher.update(str(self._isOptional))
+        if self._access: hasher.update(self._access)
+        if self._unit: hasher.update(self._unit)
+        if self._desc: hasher.update(self._desc)
+        if self._dir: hasher.update(self._dir)
+        if self._default: hasher.update(self._default)
+
+
+
+class SchemaMethod(_mapEncoder):
+    """ 
+    The SchemaMethod class describes the method's structure, and contains a
+    SchemaProperty class for each argument declared by the method.
+
+    Map format:
+    map["arguments"] = map of "name"=<SchemaProperty> pairs.
+    map["desc"] = str, description of the method
+    """
+    KEY_NAME="_name"
+    KEY_ARGUMENTS="_arguments"
+    KEY_DESC="_desc"
+    KEY_ERROR="_error"
+    def __init__(self, _args={}, _desc=None, _map=None):
+        """
+        Construct a SchemaMethod.
+
+        @type args: map of "name"=<SchemaProperty> objects
+        @param args: describes the arguments accepted by the method
+        @type _desc: str
+        @param _desc: Human-readable description of the schema
+        """
+        if _map is not None:
+            _desc = _map.get(self.KEY_DESC)
+            margs = _map.get(self.KEY_ARGUMENTS)
+            if margs:
+                # margs are in map format - covert to SchemaProperty
+                tmp_args = {}
+                for name,val in margs.iteritems():
+                    tmp_args[name] = SchemaProperty.from_map(val)
+                _args=tmp_args
+
+        self._arguments = _args.copy()
+        self._desc = _desc
+
+    # map constructor
+    def _from_map(cls, map_):
+        return cls(_map=map_)
+    from_map = classmethod(_from_map)
+
+    def get_desc(self): return self._desc
+
+    def get_arg_count(self): return len(self._arguments)
+
+    def get_arguments(self): return self._arguments.copy()
+
+    def get_argument(self, name): return self._arguments.get(name)
+
+    def add_argument(self, name, schema):
+        """
+        Add an argument to the list of arguments passed to this method.  
+        Used by an agent for dynamically creating method schema.
+
+        @type name: string
+        @param name: name of new argument
+        @type schema: SchemaProperty
+        @param schema: SchemaProperty to add to this method
+        """
+        if not isinstance(schema, SchemaProperty):
+            raise TypeError("argument must be a SchemaProperty class")
+        # "Input" argument, by default
+        if schema._dir is None:
+            schema._dir = "I"
+        self._arguments[name] = schema
+
+    def map_encode(self):
+        """
+        Return the map encoding of this schema.
+        """
+        _map = {}
+        _args = {}
+        for name,val in self._arguments.iteritems():
+            _args[name] = val.map_encode()
+        _map[self.KEY_ARGUMENTS] = _args
+        if self._desc: _map[self.KEY_DESC] = self._desc
+        return _map
+
+    def __repr__(self):
+        result = "SchemaMethod=<<args=("
+        first = True
+        for name,arg in self._arguments.iteritems():
+            if first:
+                first = False
+            else:
+                result += ", "
+            result += name
+        result += ")>>"
+        return result
+
+    def _updateHash(self, hasher):
+        """
+        Update the given hash object with a hash computed over this schema.
+        """
+        for name,val in self._arguments.iteritems():
+            hasher.update(name)
+            val._updateHash(hasher)
+        if self._desc: hasher.update(self._desc)
+
+
+
+class SchemaClass(QmfData):
+    """
+    Base class for Data and Event Schema classes.
+
+    Map format:
+    map(QmfData), plus:
+    map["_schema_id"] = map representation of a SchemaClassId instance
+    map["_primary_key_names"] = order list of primary key names
+    """
+    KEY_PRIMARY_KEY_NAMES="_primary_key_names"
+    KEY_DESC = "_desc"
+
+    SUBTYPE_PROPERTY="qmfProperty"
+    SUBTYPE_METHOD="qmfMethod"
+
+    def __init__(self, _classId=None, _desc=None, _map=None):
+        """
+        Schema Class constructor.
+
+        @type classId: class SchemaClassId
+        @param classId: Identifier for this SchemaClass
+        @type _desc: str
+        @param _desc: Human-readable description of the schema
+        """
+        if _map is not None:
+            super(SchemaClass, self).__init__(_map=_map)
+
+            # decode each value based on its type
+            for name,value in self._values.iteritems():
+                if self._subtypes.get(name) == self.SUBTYPE_METHOD:
+                    self._values[name] = SchemaMethod.from_map(value)
+                else:
+                    self._values[name] = SchemaProperty.from_map(value)
+            cid = _map.get(self.KEY_SCHEMA_ID)
+            if cid:
+                _classId = SchemaClassId.from_map(cid)
+            self._object_id_names = _map.get(self.KEY_PRIMARY_KEY_NAMES,[])
+            _desc = _map.get(self.KEY_DESC)
+        else:
+            super(SchemaClass, self).__init__()
+            self._object_id_names = []
+
+        self._classId = _classId
+        self._desc = _desc
+
+    def get_class_id(self): 
+        if not self._classId.get_hash_string():
+            self.generate_hash()
+        return self._classId
+
+    def get_desc(self): return self._desc
+
+    def generate_hash(self): 
+        """
+        generate an md5 hash over the body of the schema,
+        and return a string representation of the hash
+        in format "%08x-%08x-%08x-%08x"
+        """
+        md5Hash = _md5Obj()
+        md5Hash.update(self._classId.get_package_name())
+        md5Hash.update(self._classId.get_class_name())
+        md5Hash.update(self._classId.get_type())
+        for name,x in self._values.iteritems():
+            md5Hash.update(name)
+            x._updateHash( md5Hash )
+        for name,value in self._subtypes.iteritems():
+            md5Hash.update(name)
+            md5Hash.update(value)
+        idx = 0
+        for name in self._object_id_names:
+            md5Hash.update(str(idx) + name)
+            idx += 1
+        hstr = md5Hash.hexdigest()[0:8] + "-" +\
+            md5Hash.hexdigest()[8:16] + "-" +\
+            md5Hash.hexdigest()[16:24] + "-" +\
+            md5Hash.hexdigest()[24:32]
+        # update classId with new hash value
+        self._classId._hstr = hstr
+        return hstr
+
+
+    def get_property_count(self): 
+        count = 0
+        for value in self._subtypes.itervalues():
+            if value == self.SUBTYPE_PROPERTY:
+                count += 1
+        return count
+
+    def get_properties(self): 
+        props = {}
+        for name,value in self._subtypes.iteritems():
+            if value == self.SUBTYPE_PROPERTY:
+                props[name] = self._values.get(name)
+        return props
+
+    def get_property(self, name):
+        if self._subtypes.get(name) == self.SUBTYPE_PROPERTY:
+            return self._values.get(name)
+        return None
+
+    def add_property(self, name, prop):
+        self.set_value(name, prop, self.SUBTYPE_PROPERTY)
+        # need to re-generate schema hash
+        self._classId._hstr = None
+
+    def get_value(self, name):
+        # check for meta-properties first
+        if name == SchemaClassId.KEY_PACKAGE:
+            return self._classId.get_package_name()
+        if name == SchemaClassId.KEY_CLASS:
+            return self._classId.get_class_name()
+        if name == SchemaClassId.KEY_TYPE:
+            return self._classId.get_type()
+        if name == SchemaClassId.KEY_HASH:
+            return self.get_class_id().get_hash_string()
+        if name == self.KEY_SCHEMA_ID:
+            return self.get_class_id()
+        if name == self.KEY_PRIMARY_KEY_NAMES:
+            return self._object_id_names[:]
+        return super(SchemaClass, self).get_value(name)
+
+    def has_value(self, name):
+        if name in [SchemaClassId.KEY_PACKAGE, SchemaClassId.KEY_CLASS, SchemaClassId.KEY_TYPE,
+                    SchemaClassId.KEY_HASH, self.KEY_SCHEMA_ID, self.KEY_PRIMARY_KEY_NAMES]:
+            return True
+        super(SchemaClass, self).has_value(name)
+
+    def map_encode(self):
+        """
+        Return the map encoding of this schema.
+        """
+        _map = super(SchemaClass,self).map_encode()
+        _map[self.KEY_SCHEMA_ID] = self.get_class_id().map_encode()
+        if self._object_id_names:
+            _map[self.KEY_PRIMARY_KEY_NAMES] = self._object_id_names[:]
+        if self._desc:
+            _map[self.KEY_DESC] = self._desc
+        return _map
+
+    def __repr__(self):
+        return str(self.get_class_id())
+
+
+
+class SchemaObjectClass(SchemaClass):
+    """
+    A schema class that describes a data object.  The data object is composed 
+    of zero or more properties and methods.  An instance of the SchemaObjectClass
+    can be identified using a key generated by concantenating the values of
+    all properties named in the primary key list.
+    
+    Map format:
+    map(SchemaClass)
+    """
+    def __init__(self, _classId=None, _desc=None, 
+                 _props={}, _methods={}, _object_id_names=None, 
+                 _map=None):
+        """
+        @type pname: str
+        @param pname: name of package this schema belongs to
+        @type cname: str
+        @param cname: class name for this schema
+        @type desc: str
+        @param desc: Human-readable description of the schema
+        @type _hash: str
+        @param _methods: hash computed on the body of this schema, if known
+        @type _props: map of 'name':<SchemaProperty> objects
+        @param _props: all properties provided by this schema
+        @type _pkey: list of strings
+        @param _pkey: names of each property to be used for constructing the primary key
+        @type _methods: map of 'name':<SchemaMethod> objects
+        @param _methods: all methods provided by this schema
+        """
+        if _map is not None:
+            super(SchemaObjectClass,self).__init__(_map=_map)
+        else:
+            super(SchemaObjectClass, self).__init__(_classId=_classId, _desc=_desc)
+            self._object_id_names = _object_id_names
+            for name,value in _props.iteritems():
+                self.set_value(name, value, self.SUBTYPE_PROPERTY)
+            for name,value in _methods.iteritems():
+                self.set_value(name, value, self.SUBTYPE_METHOD)
+
+        if self._classId.get_type() != SchemaClassId.TYPE_DATA:
+            raise TypeError("Invalid ClassId type for data schema: %s" % self._classId)
+
+    # map constructor
+    def __from_map(cls, map_):
+        return cls(_map=map_)
+    from_map = classmethod(__from_map)
+
+    def get_id_names(self): 
+        return self._object_id_names[:]
+
+    def get_method_count(self):
+        count = 0
+        for value in self._subtypes.itervalues():
+            if value == self.SUBTYPE_METHOD:
+                count += 1
+        return count
+
+    def get_methods(self):
+        meths = {}
+        for name,value in self._subtypes.iteritems():
+            if value == self.SUBTYPE_METHOD:
+                meths[name] = self._values.get(name)
+        return meths
+
+    def get_method(self, name):
+        if self._subtypes.get(name) == self.SUBTYPE_METHOD:
+            return self._values.get(name)
+        return None
+
+    def add_method(self, name, method): 
+        self.set_value(name, method, self.SUBTYPE_METHOD)
+        # need to re-generate schema hash
+        self._classId._hstr = None
+
+
+
+
+class SchemaEventClass(SchemaClass):
+    """
+    A schema class that describes an event.  The event is composed
+    of zero or more properties.
+    
+    Map format:
+    map["schema_id"] = map, SchemaClassId map for this object.
+    map["desc"] = string description of this schema
+    map["properties"] = map of "name":SchemaProperty values.
+    """
+    def __init__(self, _classId=None, _desc=None, _props={},
+                 _map=None):
+        if _map is not None:
+            super(SchemaEventClass,self).__init__(_map=_map)
+        else:
+            super(SchemaEventClass, self).__init__(_classId=_classId,
+                                                   _desc=_desc)
+            for name,value in _props.iteritems():
+                self.set_value(name, value, self.SUBTYPE_PROPERTY)
+
+        if self._classId.get_type() != SchemaClassId.TYPE_EVENT:
+            raise TypeError("Invalid ClassId type for event schema: %s" %
+                            self._classId)
+
+    # map constructor
+    def __from_map(cls, map_):
+        return cls(_map=map_)
+    from_map = classmethod(__from_map)
+
+
+
+
+
+
+
+
+
+
+
diff --git a/qpid/python/qmf2/console.py b/qpid/python/qmf2/console.py
new file mode 100644
index 0000000..8e8f479
--- /dev/null
+++ b/qpid/python/qmf2/console.py
@@ -0,0 +1,1959 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import sys
+import os
+import logging
+import platform
+import time
+import datetime
+import Queue
+from threading import Thread
+from threading import Lock
+from threading import currentThread
+from threading import Condition
+
+from qpid.messaging import Connection, Message, Empty, SendError
+
+from common import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier,
+                       QmfQueryPredicate, MsgKey, QmfData, QmfAddress,
+                       AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
+                       SchemaClass, SchemaClassId, SchemaEventClass,
+                       SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent)
+
+
+
+# global flag that indicates which thread (if any) is
+# running the console notifier callback
+_callback_thread=None
+
+
+
+
+##==============================================================================
+## Sequence Manager  
+##==============================================================================
+
+class _Mailbox(object):
+    """
+    Virtual base class for all Mailbox-like objects
+    """
+    def __init__(self):
+        self._msgs = []
+        self._cv = Condition()
+        self._waiting = False
+
+    def deliver(self, obj):
+        self._cv.acquire()
+        try:
+            self._msgs.append(obj)
+            # if was empty, notify waiters
+            if len(self._msgs) == 1:
+                self._cv.notify()
+        finally:
+            self._cv.release()
+
+    def fetch(self, timeout=None):
+        self._cv.acquire()
+        try:
+            if len(self._msgs) == 0:
+                self._cv.wait(timeout)
+            if len(self._msgs):
+                return self._msgs.pop()
+            return None
+        finally:
+            self._cv.release()
+
+
+
+class SequencedWaiter(object):
+    """ 
+    Manage sequence numbers for asynchronous method calls. 
+    Allows the caller to associate a generic piece of data with a unique sequence
+    number."""
+
+    def __init__(self):
+        self.lock     = Lock()
+        self.sequence = long(time.time())  # pseudo-randomize seq start
+        self.pending  = {}
+
+
+    def allocate(self):
+        """ 
+        Reserve a sequence number.
+        
+        @rtype: long
+        @return: a unique nonzero sequence number.
+        """
+        self.lock.acquire()
+        try:
+            seq = self.sequence
+            self.sequence = self.sequence + 1
+            self.pending[seq] = _Mailbox()
+        finally:
+            self.lock.release()
+        logging.debug( "sequence %d allocated" % seq)
+        return seq
+
+
+    def put_data(self, seq, new_data):
+        seq = long(seq)
+        logging.debug( "putting data [%r] to seq %d..." % (new_data, seq) )
+        self.lock.acquire()
+        try:
+            if seq in self.pending:
+                # logging.error("Putting seq %d @ %s" % (seq,time.time()))
+                self.pending[seq].deliver(new_data)
+            else:
+                logging.error( "seq %d not found!" % seq )
+        finally:
+            self.lock.release()
+
+
+
+    def get_data(self, seq, timeout=None):
+        """ 
+        Release a sequence number reserved using the reserve method.  This must
+        be called when the sequence is no longer needed.
+        
+        @type seq: int
+        @param seq: a sequence previously allocated by calling reserve().
+        @rtype: any
+        @return: the data originally associated with the reserved sequence number.
+        """
+        seq = long(seq)
+        logging.debug( "getting data for seq=%d" % seq)
+        mbox = None
+        self.lock.acquire()
+        try:
+            if seq in self.pending:
+                mbox = self.pending[seq]
+        finally:
+            self.lock.release()
+
+        # Note well: pending list is unlocked, so we can wait.
+        # we reference mbox locally, so it will not be released
+        # until we are done.
+
+        if mbox:
+            d = mbox.fetch(timeout)
+            logging.debug( "seq %d fetched %r!" % (seq, d) )
+            return d
+
+        logging.debug( "seq %d not found!" % seq )
+        return None
+
+
+    def release(self, seq):
+        """
+        Release the sequence, and its mailbox
+        """
+        seq = long(seq)
+        logging.debug( "releasing seq %d" % seq )
+        self.lock.acquire()
+        try:
+            if seq in self.pending:
+                del self.pending[seq]
+        finally:
+            self.lock.release()
+
+
+    def isValid(self, seq):
+        """
+        True if seq is in use, else False (seq is unknown)
+        """
+        seq = long(seq)
+        self.lock.acquire()
+        try:
+            return seq in self.pending
+        finally:
+            self.lock.release()
+        return False
+
+
+##==============================================================================
+## DATA MODEL
+##==============================================================================
+
+
+class QmfConsoleData(QmfData):
+    """
+    Console's representation of an managed QmfData instance.  
+    """
+    def __init__(self, map_, agent, _schema=None):
+        super(QmfConsoleData, self).__init__(_map=map_,
+                                             _schema=_schema,
+                                             _const=True) 
+        self._agent = agent
+
+    def get_timestamps(self): 
+        """
+        Returns a list of timestamps describing the lifecycle of
+        the object.  All timestamps are represented by the AMQP
+        timestamp type.  [0] = time of last update from Agent,
+                         [1] = creation timestamp 
+                         [2] = deletion timestamp, or zero if not
+        deleted.
+        """
+        return [self._utime, self._ctime, self._dtime]
+
+    def get_create_time(self): 
+        """
+        returns the creation timestamp
+        """
+        return self._ctime
+
+    def get_update_time(self): 
+        """
+        returns the update timestamp
+        """
+        return self._utime
+
+    def get_delete_time(self): 
+        """
+        returns the deletion timestamp, or zero if not yet deleted.
+        """
+        return self._dtime
+
+    def is_deleted(self): 
+        """
+        True if deletion timestamp not zero.
+        """
+        return self._dtime != long(0)
+
+    def refresh(self, _reply_handle=None, _timeout=None): 
+        """
+        request that the Agent update the value of this object's
+        contents.
+        """
+        if _reply_handle is not None:
+            logging.error(" ASYNC REFRESH TBD!!!")
+            return None
+
+        assert self._agent
+        assert self._agent._console
+
+        if _timeout is None:
+            _timeout = self._agent._console._reply_timeout
+
+
+        # create query to agent using this objects ID
+        oid = self.get_object_id()
+        query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT,
+                                   self.get_object_id())
+        obj_list = self._agent._console.doQuery(self._agent, query,
+                                                timeout=_timeout)
+        if obj_list is None or len(obj_list) != 1:
+            return None
+
+        self._update(obj_list[0])
+        return self
+
+
+    def invoke_method(self, name, _in_args={}, _reply_handle=None,
+                      _timeout=None):
+        """
+        invoke the named method.
+        """
+        assert self._agent
+        assert self._agent._console
+
+        oid = self.get_object_id()
+        if oid is None:
+            raise ValueError("Cannot invoke methods on unmanaged objects.")
+
+        if _timeout is None:
+            _timeout = self._agent._console._reply_timeout
+
+        if _in_args:
+            _in_args = _in_args.copy()
+
+        if self._schema:
+            # validate
+            ms = self._schema.get_method(name)
+            if ms is None:
+                raise ValueError("Method '%s' is undefined." % name)
+
+            for aname,prop in ms.get_arguments().iteritems():
+                if aname not in _in_args:
+                    if prop.get_default():
+                        _in_args[aname] = prop.get_default()
+                    elif not prop.is_optional():
+                        raise ValueError("Method '%s' requires argument '%s'"
+                                         % (name, aname))
+            for aname in _in_args.iterkeys():
+                prop = ms.get_argument(aname)
+                if prop is None:
+                    raise ValueError("Method '%s' does not define argument"
+                                     " '%s'" % (name, aname))
+                if "I" not in prop.get_direction():
+                    raise ValueError("Method '%s' argument '%s' is not an"
+                                     " input." % (name, aname)) 
+
+            # @todo check if value is correct (type, range, etc)
+
+        handle = self._agent._console._req_correlation.allocate()
+        if handle == 0:
+            raise Exception("Can not allocate a correlation id!")
+
+        _map = {self.KEY_OBJECT_ID:str(oid),
+                SchemaMethod.KEY_NAME:name}
+        if _in_args:
+            _map[SchemaMethod.KEY_ARGUMENTS] = _in_args
+
+        logging.debug("Sending method req to Agent (%s)" % time.time())
+        try:
+            self._agent._sendMethodReq(_map, handle)
+        except SendError, e:
+            logging.error(str(e))
+            self._agent._console._req_correlation.release(handle)
+            return None
+
+        # @todo async method calls!!!
+        if _reply_handle is not None:
+            print("ASYNC TBD")
+
+        logging.debug("Waiting for response to method req (%s)" % _timeout)
+        replyMsg = self._agent._console._req_correlation.get_data(handle, _timeout)
+        self._agent._console._req_correlation.release(handle)
+        if not replyMsg:
+            logging.debug("Agent method req wait timed-out.")
+            return None
+
+        _map = replyMsg.content.get(MsgKey.method)
+        if not _map:
+            logging.error("Invalid method call reply message")
+            return None
+
+        error=_map.get(SchemaMethod.KEY_ERROR)
+        if error:
+            return MethodResult(_error=QmfData.from_map(error))
+        else:
+            return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS))
+
+    def _update(self, newer):
+        super(QmfConsoleData,self).__init__(_values=newer._values, _subtypes=newer._subtypes,
+                                           _tag=newer._tag, _object_id=newer._object_id,
+                                           _ctime=newer._ctime, _utime=newer._utime, 
+                                           _dtime=newer._dtime,
+                                           _schema=newer._schema, _const=True)
+
+class QmfLocalData(QmfData):
+    """
+    Console's representation of an unmanaged QmfData instance.  There
+    is no remote agent associated with this instance. The Console has
+    full control over this instance.
+    """
+    def __init__(self, values, _subtypes={}, _tag=None, _object_id=None,
+                 _schema=None):
+        # timestamp in millisec since epoch UTC
+        ctime = long(time.time() * 1000)
+        super(QmfLocalData, self).__init__(_values=values,
+                                           _subtypes=_subtypes, _tag=_tag, 
+                                           _object_id=_object_id,
+                                           _schema=_schema, _ctime=ctime,
+                                           _utime=ctime, _const=False)
+
+
+class Agent(object):
+    """
+    A local representation of a remote agent managed by this console.
+    """
+    def __init__(self, name, console):
+        """
+        @type name: AgentId
+        @param name: uniquely identifies this agent in the AMQP domain.
+        """
+
+        if not isinstance(console, Console):
+            raise TypeError("parameter must be an instance of class Console")
+
+        self._name = name
+        self._address = QmfAddress.direct(name, console._domain)
+        self._console = console
+        self._sender = None
+        self._packages = {} # map of {package-name:[list of class-names], } for this agent
+        self._subscriptions = [] # list of active standing subscriptions for this agent
+        self._announce_timestamp = None # datetime when last announce received
+        logging.debug( "Created Agent with address: [%s]" % self._address )
+
+
+    def get_name(self):
+        return self._name
+
+    def isActive(self):
+        return self._announce_timestamp != None
+    
+    def _sendMsg(self, msg, correlation_id=None):
+        """
+        Low-level routine to asynchronously send a message to this agent.
+        """
+        msg.reply_to = str(self._console._address)
+        # handle = self._console._req_correlation.allocate()
+        # if handle == 0:
+        #    raise Exception("Can not allocate a correlation id!")
+        # msg.correlation_id = str(handle)
+        if correlation_id:
+            msg.correlation_id = str(correlation_id)
+        self._sender.send(msg)
+        # return handle
+
+    def get_packages(self):
+        """
+        Return a list of the names of all packages known to this agent.
+        """
+        return self._packages.keys()
+
+    def get_classes(self):
+        """
+        Return a dictionary [key:class] of classes known to this agent.
+        """
+        return self._packages.copy()
+
+    def get_objects(self, query, kwargs={}):
+        """
+        Return a list of objects that satisfy the given query.
+
+        @type query: dict, or common.Query
+        @param query: filter for requested objects
+        @type kwargs: dict
+        @param kwargs: ??? used to build match selector and query ???
+        @rtype: list
+        @return: list of matching objects, or None.
+        """
+        pass
+
+    def get_object(self, query, kwargs={}):
+        """
+        Get one object - query is expected to match only one object.
+        ??? Recommended: explicit timeout param, default None ???
+
+        @type query: dict, or common.Query
+        @param query: filter for requested objects
+        @type kwargs: dict
+        @param kwargs: ??? used to build match selector and query ???
+        @rtype: qmfConsole.ObjectProxy
+        @return: one matching object, or none
+        """
+        pass
+
+
+    def create_subscription(self, query):
+        """
+        Factory for creating standing subscriptions based on a given query.
+
+        @type query: common.Query object
+        @param query: determines the list of objects for which this subscription applies
+        @rtype: qmfConsole.Subscription
+        @returns: an object representing the standing subscription.
+        """
+        pass
+
+
+    def invoke_method(self, name, _in_args={}, _reply_handle=None,
+                      _timeout=None): 
+        """
+        """
+        assert self._console
+
+        if _timeout is None:
+            _timeout = self._console._reply_timeout
+
+        if _in_args:
+            _in_args = _in_args.copy()
+
+        handle = self._console._req_correlation.allocate()
+        if handle == 0:
+            raise Exception("Can not allocate a correlation id!")
+
+        _map = {SchemaMethod.KEY_NAME:name}
+        if _in_args:
+            _map[SchemaMethod.KEY_ARGUMENTS] = _in_args
+
+        logging.debug("Sending method req to Agent (%s)" % time.time())
+        try:
+            self._sendMethodReq(_map, handle)
+        except SendError, e:
+            logging.error(str(e))
+            self._console._req_correlation.release(handle)
+            return None
+
+        # @todo async method calls!!!
+        if _reply_handle is not None:
+            print("ASYNC TBD")
+
+        logging.debug("Waiting for response to method req (%s)" % _timeout)
+        replyMsg = self._console._req_correlation.get_data(handle, _timeout)
+        self._console._req_correlation.release(handle)
+        if not replyMsg:
+            logging.debug("Agent method req wait timed-out.")
+            return None
+
+        _map = replyMsg.content.get(MsgKey.method)
+        if not _map:
+            logging.error("Invalid method call reply message")
+            return None
+
+        return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS),
+                            _error=_map.get(SchemaMethod.KEY_ERROR))
+
+    def enable_events(self):
+        raise Exception("enable_events tbd")
+
+    def disable_events(self):
+        raise Exception("disable_events tbd")
+
+    def destroy(self):
+        raise Exception("destroy tbd")
+
+    def __repr__(self):
+        return str(self._address)
+    
+    def __str__(self):
+        return self.__repr__()
+
+    def _sendQuery(self, query, correlation_id=None):
+        """
+        """
+        msg = Message(subject=makeSubject(OpCode.get_query),
+                      properties={"method":"request"},
+                      content={MsgKey.query: query.map_encode()})
+        self._sendMsg( msg, correlation_id )
+
+
+    def _sendMethodReq(self, mr_map, correlation_id=None):
+        """
+        """
+        msg = Message(subject=makeSubject(OpCode.method_req),
+                      properties={"method":"request"},
+                      content=mr_map)
+        self._sendMsg( msg, correlation_id )
+
+
+  ##==============================================================================
+  ## METHOD CALL
+  ##==============================================================================
+
+class MethodResult(object):
+    def __init__(self, _out_args=None, _error=None):
+        self._error = _error
+        self._out_args = _out_args
+
+    def succeeded(self): 
+        return self._error is None
+
+    def get_exception(self):
+        return self._error
+
+    def get_arguments(self): 
+        return self._out_args
+
+    def get_argument(self, name): 
+        arg = None
+        if self._out_args:
+            arg = self._out_args.get(name)
+        return arg
+
+
+  ##==============================================================================
+  ## CONSOLE
+  ##==============================================================================
+
+
+
+
+
+
+class Console(Thread):
+    """
+    A Console manages communications to a collection of agents on behalf of an application.
+    """
+    def __init__(self, name=None, _domain=None, notifier=None, 
+                 reply_timeout = 60,
+                 # agent_timeout = 120,
+                 agent_timeout = 60,
+                 kwargs={}):
+        """
+        @type name: str
+        @param name: identifier for this console.  Must be unique.
+        @type notifier: qmfConsole.Notifier
+        @param notifier: invoked when events arrive for processing.
+        @type kwargs: dict
+        @param kwargs: ??? Unused
+        """
+        Thread.__init__(self)
+        if not name:
+            self._name = "qmfc-%s.%d" % (platform.node(), os.getpid())
+        else:
+            self._name = str(name)
+        self._domain = _domain
+        self._address = QmfAddress.direct(self._name, self._domain)
+        self._notifier = notifier
+        self._lock = Lock()
+        self._conn = None
+        self._session = None
+        # dict of "agent-direct-address":class Agent entries
+        self._agent_map = {}
+        self._direct_recvr = None
+        self._announce_recvr = None
+        self._locate_sender = None
+        self._schema_cache = {}
+        self._req_correlation = SequencedWaiter()
+        self._operational = False
+        self._agent_discovery_filter = None
+        self._reply_timeout = reply_timeout
+        self._agent_timeout = agent_timeout
+        self._next_agent_expire = None
+        # lock out run() thread
+        self._cv = Condition()
+        # for passing WorkItems to the application
+        self._work_q = Queue.Queue()
+        self._work_q_put = False
+        ## Old stuff below???
+        #self._broker_list = []
+        #self.impl = qmfengine.Console()
+        #self._event = qmfengine.ConsoleEvent()
+        ##self._cv = Condition()
+        ##self._sync_count = 0
+        ##self._sync_result = None
+        ##self._select = {}
+        ##self._cb_cond = Condition()
+
+
+
+    def destroy(self, timeout=None):
+        """
+        Must be called before the Console is deleted.  
+        Frees up all resources and shuts down all background threads.
+
+        @type timeout: float
+        @param timeout: maximum time in seconds to wait for all background threads to terminate.  Default: forever.
+        """
+        logging.debug("Destroying Console...")
+        if self._conn:
+            self.removeConnection(self._conn, timeout)
+        logging.debug("Console Destroyed")
+
+
+
+    def addConnection(self, conn):
+        """
+        Add a AMQP connection to the console.  The console will setup a session over the
+        connection.  The console will then broadcast an Agent Locate Indication over
+        the session in order to discover present agents.
+
+        @type conn: qpid.messaging.Connection
+        @param conn: the connection to the AMQP messaging infrastructure.
+        """
+        if self._conn:
+            raise Exception( "Multiple connections per Console not supported." );
+        self._conn = conn
+        self._session = conn.session(name=self._name)
+        self._direct_recvr = self._session.receiver(str(self._address) +
+                                                    ";{create:always,"
+                                                    " node-properties:"
+                                                    " {type:topic,"
+                                                    " x-properties:"
+                                                    " {type:direct}}}", 
+                                                    capacity=1)
+        logging.debug("my direct addr=%s" % self._direct_recvr.source)
+
+        ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
+        self._announce_recvr = self._session.receiver(str(ind_addr) +
+                                                      ";{create:always,"
+                                                      " node-properties:{type:topic}}",
+                                                      capacity=1)
+        logging.debug("agent.ind addr=%s" % self._announce_recvr.source)
+
+        locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
+        self._locate_sender = self._session.sender(str(locate_addr) +
+                                                   ";{create:always,"
+                                                   " node-properties:{type:topic}}")
+        logging.debug("agent.locate addr=%s" % self._locate_sender.target)
+
+        #
+        # Now that receivers are created, fire off the receive thread...
+        #
+        self._operational = True
+        self.start()
+
+
+
+    def removeConnection(self, conn, timeout=None):
+        """
+        Remove an AMQP connection from the console.  Un-does the add_connection() operation,
+        and releases any agents and sessions associated with the connection.
+
+        @type conn: qpid.messaging.Connection
+        @param conn: connection previously added by add_connection()
+        """
+        if self._conn and conn and conn != self._conn:
+            logging.error( "Attempt to delete unknown connection: %s" % str(conn))
+
+        # tell connection thread to shutdown
+        self._operational = False
+        if self.isAlive():
+            # kick my thread to wake it up
+            logging.debug("Making temp sender for [%s]" % self._address)
+            tmp_sender = self._session.sender(str(self._address))
+            try:
+                msg = Message(subject=makeSubject(OpCode.noop))
+                tmp_sender.send( msg, sync=True )
+            except SendError, e:
+                logging.error(str(e))
+            logging.debug("waiting for console receiver thread to exit")
+            self.join(timeout)
+            if self.isAlive():
+                logging.error( "Console thread '%s' is hung..." % self.getName() )
+        self._direct_recvr.close()
+        self._announce_recvr.close()
+        self._locate_sender.close()
+        self._session.close()
+        self._session = None
+        self._conn = None
+        logging.debug("console connection removal complete")
+
+
+    def getAddress(self):
+        """
+        The AMQP address this Console is listening to.
+        """
+        return self._address
+
+
+    def destroyAgent( self, agent ):
+        """
+        Undoes create.
+        """
+        if not isinstance(agent, Agent):
+            raise TypeError("agent must be an instance of class Agent")
+
+        self._lock.acquire()
+        try:
+            if agent._id in self._agent_map:
+                del self._agent_map[agent._id]
+        finally:
+            self._lock.release()
+
+    def find_agent(self, name, timeout=None ):
+        """
+        Given the name of a particular agent, return an instance of class Agent 
+        representing that agent.  Return None if the agent does not exist.
+        """
+
+        self._lock.acquire()
+        try:
+            agent = self._agent_map.get(name)
+            if agent:
+                return agent
+        finally:
+            self._lock.release()
+
+        # agent not present yet - ping it with an agent_locate
+
+        handle = self._req_correlation.allocate()
+        if handle == 0:
+            raise Exception("Can not allocate a correlation id!")
+        try:
+            tmp_sender = self._session.sender(str(QmfAddress.direct(name,
+                                                                    self._domain))
+                                              + ";{create:always,"
+                                              " node-properties:"
+                                              " {type:topic,"
+                                              " x-properties:"
+                                              " {type:direct}}}")
+
+            query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
+            msg = Message(subject=makeSubject(OpCode.agent_locate),
+                          properties={"method":"request"},
+                          content={MsgKey.query: query.map_encode()})
+            msg.reply_to = str(self._address)
+            msg.correlation_id = str(handle)
+            logging.debug("Sending Agent Locate (%s)" % time.time())
+            tmp_sender.send( msg )
+        except SendError, e:
+            logging.error(str(e))
+            self._req_correlation.release(handle)
+            return None
+
+        if timeout is None:
+            timeout = self._reply_timeout
+
+        new_agent = None
+        logging.debug("Waiting for response to Agent Locate (%s)" % timeout)
+        self._req_correlation.get_data( handle, timeout )
+        self._req_correlation.release(handle)
+        logging.debug("Agent Locate wait ended (%s)" % time.time())
+        self._lock.acquire()
+        try:
+            new_agent = self._agent_map.get(name)
+        finally:
+            self._lock.release()
+        return new_agent
+
+
+    def doQuery(self, agent, query, timeout=None ):
+        """
+        """
+
+        target = query.get_target()
+        handle = self._req_correlation.allocate()
+        if handle == 0:
+            raise Exception("Can not allocate a correlation id!")
+        try:
+            logging.debug("Sending Query to Agent (%s)" % time.time())
+            agent._sendQuery(query, handle)
+        except SendError, e:
+            logging.error(str(e))
+            self._req_correlation.release(handle)
+            return None
+
+        if not timeout:
+            timeout = self._reply_timeout
+
+        logging.debug("Waiting for response to Query (%s)" % timeout)
+        reply = self._req_correlation.get_data(handle, timeout)
+        self._req_correlation.release(handle)
+        if not reply:
+            logging.debug("Agent Query wait timed-out.")
+            return None
+
+        if target == QmfQuery.TARGET_PACKAGES:
+            # simply pass back the list of package names
+            logging.debug("Response to Packet Query received")
+            return reply.content.get(MsgKey.package_info)
+        elif target == QmfQuery.TARGET_OBJECT_ID:
+            # simply pass back the list of object_id's
+            logging.debug("Response to Object Id Query received")
+            return reply.content.get(MsgKey.object_id)
+        elif target == QmfQuery.TARGET_SCHEMA_ID:
+            logging.debug("Response to Schema Id Query received")
+            id_list = []
+            for sid_map in reply.content.get(MsgKey.schema_id):
+                id_list.append(SchemaClassId.from_map(sid_map))
+            return id_list
+        elif target == QmfQuery.TARGET_SCHEMA:
+            logging.debug("Response to Schema Query received")
+            schema_list = []
+            for schema_map in reply.content.get(MsgKey.schema):
+                # extract schema id, convert based on schema type
+                sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
+                if sid_map:
+                    sid = SchemaClassId.from_map(sid_map)
+                    if sid:
+                        if sid.get_type() == SchemaClassId.TYPE_DATA:
+                            schema = SchemaObjectClass.from_map(schema_map)
+                        else:
+                            schema = SchemaEventClass.from_map(schema_map)
+                        schema_list.append(schema)
+                        self._add_schema(schema)
+            return schema_list
+        elif target == QmfQuery.TARGET_OBJECT:
+            logging.debug("Response to Object Query received")
+            obj_list = []
+            for obj_map in reply.content.get(MsgKey.data_obj):
+                # if the object references a schema, fetch it
+                sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
+                if sid_map:
+                    sid = SchemaClassId.from_map(sid_map)
+                    schema = self._fetch_schema(sid, _agent=agent,
+                                                _timeout=timeout)
+                    if not schema:
+                        logging.warning("Unknown schema, id=%s" % sid)
+                        continue
+                    obj = QmfConsoleData(map_=obj_map, agent=agent,
+                                         _schema=schema)
+                else:
+                    # no schema needed
+                    obj = QmfConsoleData(map_=obj_map, agent=agent)
+                obj_list.append(obj)
+            return obj_list
+        else:
+            logging.warning("Unexpected Target for a Query: '%s'" % target)
+            return None
+
+    def run(self):
+        global _callback_thread
+        #
+        # @todo KAG Rewrite when api supports waiting on multiple receivers
+        #
+        while self._operational:
+
+            # qLen = self._work_q.qsize()
+
+            while True:
+                try:
+                    msg = self._announce_recvr.fetch(timeout=0)
+                except Empty:
+                    break
+                self._dispatch(msg, _direct=False)
+
+            while True:
+                try:
+                    msg = self._direct_recvr.fetch(timeout = 0)
+                except Empty:
+                    break
+                self._dispatch(msg, _direct=True)
+
+            for agent in self._agent_map.itervalues():
+                try:
+                    msg = agent._event_recvr.fetch(timeout = 0)
+                except Empty:
+                    continue
+                self._dispatch(msg, _direct=False)
+
+
+            self._expireAgents()   # check for expired agents
+
+            #if qLen == 0 and self._work_q.qsize() and self._notifier:
+            if self._work_q_put and self._notifier:
+                # new stuff on work queue, kick the the application...
+                self._work_q_put = False
+                _callback_thread = currentThread()
+                logging.info("Calling console notifier.indication")
+                self._notifier.indication()
+                _callback_thread = None
+
+            if self._operational:
+                # wait for a message to arrive or an agent
+                # to expire
+                now = datetime.datetime.utcnow()
+                if self._next_agent_expire > now:
+                    timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds
+                    try:
+                        logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
+                        xxx = self._session.next_receiver(timeout = timeout)
+                    except Empty:
+                        pass
+
+
+        logging.debug("Shutting down Console thread")
+
+    def get_objects(self,
+                    _schema_id=None,
+                    _pname=None, _cname=None,
+                    _object_id=None,
+                    _agents=None,
+                    _timeout=None):
+        """
+        @todo
+        """
+        if _object_id is not None:
+            # query by object id
+            query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, _object_id)
+        elif _schema_id is not None:
+            pred = QmfQueryPredicate({QmfQuery.CMP_EQ:
+                                          [QmfData.KEY_SCHEMA_ID,
+                                           _schema_id.map_encode()]})
+            query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred)
+        elif _pname is not None:
+            # query by package name (and maybe class name)
+            if _cname is not None:
+                pred = QmfQueryPredicate({QmfQuery.LOGIC_AND:
+                                              [{QmfQuery.CMP_EQ:
+                                                    [SchemaClassId.KEY_PACKAGE,
+                                                     _pname]},
+                                               {QmfQuery.CMP_EQ:
+                                                    [SchemaClassId.KEY_CLASS,
+                                                     _cname]}]})
+            else:
+                pred = QmfQueryPredicate({QmfQuery.CMP_EQ:
+                                              [SchemaClassId.KEY_PACKAGE,
+                                               _pname]})
+            query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred)
+
+        else:
+            raise Exception("invalid arguments")
+
+        if _agents is None:
+            # use copy of current agent list
+            self._lock.acquire()
+            try:
+                agent_list = self._agent_map.values()
+            finally:
+                self._lock.release()
+        elif isinstance(_agents, Agent):
+            agent_list = [_agents]
+        else:
+            agent_list = _agents
+            # @todo validate this list!
+
+        # @todo: fix when async doQuery done - query all agents at once, then
+        # wait for replies, instead of per-agent querying....
+
+        if _timeout is None:
+            _timeout = self._reply_timeout
+
+        obj_list = []
+        expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout)
+        for agent in agent_list:
+            if not agent.isActive():
+                continue
+            now = datetime.datetime.utcnow()
+            if now >= expired:
+                break
+            timeout = ((expired - now) + datetime.timedelta(microseconds=999999)).seconds
+            reply = self.doQuery(agent, query, timeout)
+            if reply:
+                obj_list = obj_list + reply
+
+        if obj_list:
+            return obj_list
+        return None
+
+
+
+    # called by run() thread ONLY
+    #
+    def _dispatch(self, msg, _direct=True):
+        """
+        PRIVATE: Process a message received from an Agent
+        """
+        logging.debug( "Message received from Agent! [%s]" % msg )
+        try:
+            version,opcode = parseSubject(msg.subject)
+            # @todo: deal with version mismatch!!!
+        except:
+            logging.error("Ignoring unrecognized broadcast message '%s'" % msg.subject)
+            return
+
+        cmap = {}; props = {}
+        if msg.content_type == "amqp/map":
+            cmap = msg.content
+        if msg.properties:
+            props = msg.properties
+
+        if opcode == OpCode.agent_ind:
+            self._handleAgentIndMsg( msg, cmap, version, _direct )
+        elif opcode == OpCode.data_ind:
+            self._handleDataIndMsg(msg, cmap, version, _direct)
+        elif opcode == OpCode.event_ind:
+            self._handleEventIndMsg(msg, cmap, version, _direct)
+        elif opcode == OpCode.managed_object:
+            logging.warning("!!! managed_object TBD !!!")
+        elif opcode == OpCode.object_ind:
+            logging.warning("!!! object_ind TBD !!!")
+        elif opcode == OpCode.response:
+            self._handleResponseMsg(msg, cmap, version, _direct)
+        elif opcode == OpCode.schema_ind:
+            logging.warning("!!! schema_ind TBD !!!")
+        elif opcode == OpCode.noop:
+             logging.debug("No-op msg received.")
+        else:
+            logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode)
+
+
+    def _handleAgentIndMsg(self, msg, cmap, version, direct):
+        """
+        Process a received agent-ind message.  This message may be a response to a
+        agent-locate, or it can be an unsolicited agent announce.
+        """
+        logging.debug("_handleAgentIndMsg '%s' (%s)" % (msg, time.time()))
+
+        ai_map = cmap.get(MsgKey.agent_info)
+        if not ai_map or not isinstance(ai_map, type({})):
+            logging.warning("Bad agent-ind message received: '%s'" % msg)
+            return
+        name = ai_map.get("_name")
+        if not name:
+            logging.warning("Bad agent-ind message received: agent name missing"
+                            " '%s'" % msg)
+            return
+
+        ignore = True
+        matched = False
+        correlated = False
+        agent_query = self._agent_discovery_filter
+
+        if msg.correlation_id:
+            correlated = self._req_correlation.isValid(msg.correlation_id)
+
+        if direct and correlated:
+            ignore = False
+        elif agent_query:
+            matched = agent_query.evaluate(QmfData.create(values=ai_map))
+            ignore = not matched
+
+        if not ignore:
+            agent = None
+            self._lock.acquire()
+            try:
+                agent = self._agent_map.get(name)
+            finally:
+                self._lock.release()
+
+            if not agent:
+                # need to create and add a new agent
+                agent = self._createAgent(name)
+                if not agent:
+                    return   # failed to add agent
+
+            # lock out expiration scanning code
+            self._lock.acquire()
+            try:
+                old_timestamp = agent._announce_timestamp
+                agent._announce_timestamp = datetime.datetime.utcnow()
+            finally:
+                self._lock.release()
+
+            if old_timestamp == None and matched:
+                logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
+                wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
+                self._work_q.put(wi)
+                self._work_q_put = True
+
+            if correlated:
+                # wake up all waiters
+                logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+                self._req_correlation.put_data(msg.correlation_id, msg)
+
+
+
+
+    def _handleDataIndMsg(self, msg, cmap, version, direct):
+        """
+        Process a received data-ind message.
+        """
+        logging.debug("_handleDataIndMsg '%s' (%s)" % (msg, time.time()))
+
+        if not self._req_correlation.isValid(msg.correlation_id):
+            logging.debug("Data indicate received with unknown correlation_id"
+                          " msg='%s'" % str(msg)) 
+            return
+
+        # wake up all waiters
+        logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+        self._req_correlation.put_data(msg.correlation_id, msg)
+
+
+    def _handleResponseMsg(self, msg, cmap, version, direct):
+        """
+        Process a received data-ind message.
+        """
+        # @todo code replication - clean me.
+        logging.debug("_handleResponseMsg '%s' (%s)" % (msg, time.time()))
+
+        if not self._req_correlation.isValid(msg.correlation_id):
+            logging.debug("Response msg received with unknown correlation_id"
+                          " msg='%s'" % str(msg))
+            return
+
+        # wake up all waiters
+        logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+        self._req_correlation.put_data(msg.correlation_id, msg)
+
+    def _handleEventIndMsg(self, msg, cmap, version, _direct):
+        ei_map = cmap.get(MsgKey.event)
+        if not ei_map or not isinstance(ei_map, type({})):
+            logging.warning("Bad event indication message received: '%s'" % msg)
+            return
+
+        aname = ei_map.get("_name")
+        emap = ei_map.get("_event")
+        if not aname:
+            logging.debug("No '_name' field in event indication message.")
+            return
+        if not emap:
+            logging.debug("No '_event' field in event indication message.")
+            return
+        # @todo: do I need to lock this???
+        agent = self._agent_map.get(aname)
+        if not agent:
+            logging.debug("Agent '%s' not known." % aname)
+            return
+        try:
+            # @todo: schema???
+            event = QmfEvent.from_map(emap)
+        except TypeError:
+            logging.debug("Invalid QmfEvent map received: %s" % str(emap))
+            return
+
+        # @todo: schema?  Need to fetch it, but not from this thread!
+        # This thread can not pend on a request.
+        logging.debug("Publishing event received from agent %s" % aname)
+        wi = WorkItem(WorkItem.EVENT_RECEIVED, None,
+                      {"agent":agent,
+                       "event":event})
+        self._work_q.put(wi)
+        self._work_q_put = True
+
+
+    def _expireAgents(self):
+        """
+        Check for expired agents and issue notifications when they expire.
+        """
+        now = datetime.datetime.utcnow()
+        if self._next_agent_expire and now < self._next_agent_expire:
+            return
+        lifetime_delta = datetime.timedelta(seconds = self._agent_timeout)
+        next_expire_delta = lifetime_delta
+        self._lock.acquire()
+        try:
+            logging.debug("!!! expiring agents '%s'" % now)
+            for agent in self._agent_map.itervalues():
+                if agent._announce_timestamp:
+                    agent_deathtime = agent._announce_timestamp + lifetime_delta
+                    if agent_deathtime <= now:
+                        logging.debug("AGENT_DELETED for %s" % agent)
+                        agent._announce_timestamp = None
+                        wi = WorkItem(WorkItem.AGENT_DELETED, None,
+                                      {"agent":agent})
+                        # @todo: remove agent from self._agent_map
+                        self._work_q.put(wi)
+                        self._work_q_put = True
+                    else:
+                        if (agent_deathtime - now) < next_expire_delta:
+                            next_expire_delta = agent_deathtime - now
+
+            self._next_agent_expire = now + next_expire_delta
+            logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire)
+        finally:
+            self._lock.release()
+
+
+
+    def _createAgent( self, name ):
+        """
+        Factory to create/retrieve an agent for this console
+        """
+        logging.debug("creating agent %s" % name)
+        self._lock.acquire()
+        try:
+            agent = self._agent_map.get(name)
+            if agent:
+                return agent
+
+            agent = Agent(name, self)
+            try:
+                agent._sender = self._session.sender(str(agent._address) + 
+                                                     ";{create:always,"
+                                                     " node-properties:"
+                                                     " {type:topic,"
+                                                     " x-properties:"
+                                                     " {type:direct}}}") 
+            except:
+                logging.warning("Unable to create sender for %s" % name)
+                return None
+            logging.debug("created agent sender %s" % agent._sender.target)
+
+            events_addr = QmfAddress.topic(name, self._domain)
+            try:
+                agent._event_recvr = self._session.receiver(str(events_addr) +
+                                                            ";{create:always,"
+                                                            " node-properties:{type:topic}}",
+                                                            capacity=1)
+            except:
+                logging.warning("Unable to create event receiver for %s" % name)
+                return None
+            logging.debug("created agent event receiver %s" % agent._event_recvr.source)
+
+            self._agent_map[name] = agent
+        finally:
+            self._lock.release()
+
+        # new agent - query for its schema database for
+        # seeding the schema cache (@todo)
+        # query = QmfQuery({QmfQuery.TARGET_SCHEMA_ID:None})
+        # agent._sendQuery( query )
+
+        return agent
+
+
+
+    def enable_agent_discovery(self, _query=None):
+        """
+        Called to enable the asynchronous Agent Discovery process.
+        Once enabled, AGENT_ADD work items can arrive on the WorkQueue.
+        """
+        # @todo: fix - take predicate only, not entire query!
+        if _query is not None:
+            if (not isinstance(_query, QmfQuery) or
+                _query.get_target() != QmfQuery.TARGET_AGENT):
+                raise TypeError("Type QmfQuery with target == TARGET_AGENT expected")
+            self._agent_discovery_filter = _query
+        else:
+            # create a match-all agent query (no predicate)
+            self._agent_discovery_filter = QmfQuery.create_wildcard(QmfQuery.TARGET_AGENT) 
+
+    def disable_agent_discovery(self):
+        """
+        Called to disable the async Agent Discovery process enabled by
+        calling enableAgentDiscovery()
+        """
+        self._agent_discovery_filter = None
+
+
+
+    def get_workitem_count(self):
+        """
+        Returns the count of pending WorkItems that can be retrieved.
+        """
+        return self._work_q.qsize()
+
+
+
+    def get_next_workitem(self, timeout=None):
+        """
+        Returns the next pending work item, or None if none available.
+        @todo: subclass and return an Empty event instead.
+        """
+        try:
+            wi = self._work_q.get(True, timeout)
+        except Queue.Empty:
+            return None
+        return wi
+
+
+    def release_workitem(self, wi):
+        """
+        Return a WorkItem to the Console when it is no longer needed.
+        @todo: call Queue.task_done() - only 2.5+
+
+        @type wi: class qmfConsole.WorkItem
+        @param wi: work item object to return.
+        """
+        pass
+
+    def _add_schema(self, schema):
+        """
+        @todo
+        """
+        if not isinstance(schema, SchemaClass):
+            raise TypeError("SchemaClass type expected")
+
+        self._lock.acquire()
+        try:
+            sid = schema.get_class_id()
+            if not self._schema_cache.has_key(sid):
+                self._schema_cache[sid] = schema
+        finally:
+            self._lock.release()
+
+    def _fetch_schema(self, schema_id, _agent=None, _timeout=None):
+        """
+        Find the schema identified by schema_id.  If not in the cache, ask the
+        agent for it.
+        """
+        if not isinstance(schema_id, SchemaClassId):
+            raise TypeError("SchemaClassId type expected")
+
+        self._lock.acquire()
+        try:
+            schema = self._schema_cache.get(schema_id)
+            if schema:
+                return schema
+        finally:
+            self._lock.release()
+
+        if _agent is None:
+            return None
+
+        # note: doQuery will add the new schema to the cache automatically.
+        slist = self.doQuery(_agent,
+                             QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id),
+                             _timeout)
+        if slist:
+            return slist[0]
+        else:
+            return None
+
+
+
+    # def get_packages(self):
+    #     plist = []
+    #     for i in range(self.impl.packageCount()):
+    #         plist.append(self.impl.getPackageName(i))
+    #     return plist
+    
+    
+    # def get_classes(self, package, kind=CLASS_OBJECT):
+    #     clist = []
+    #     for i in range(self.impl.classCount(package)):
+    #         key = self.impl.getClass(package, i)
+    #         class_kind = self.impl.getClassKind(key)
+    #         if class_kind == kind:
+    #             if kind == CLASS_OBJECT:
+    #                 clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)}))
+    #             elif kind == CLASS_EVENT:
+    #                 clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)}))
+    #     return clist
+    
+    
+    # def bind_package(self, package):
+    #     return self.impl.bindPackage(package)
+    
+    
+    # def bind_class(self, kwargs = {}):
+    #     if "key" in kwargs:
+    #         self.impl.bindClass(kwargs["key"])
+    #     elif "package" in kwargs:
+    #         package = kwargs["package"]
+    #         if "class" in kwargs:
+    #             self.impl.bindClass(package, kwargs["class"])
+    #         else:
+    #             self.impl.bindClass(package)
+    #     else:
+    #         raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']")
+    
+    
+    # def get_agents(self, broker=None):
+    #     blist = []
+    #     if broker:
+    #         blist.append(broker)
+    #     else:
+    #         self._cv.acquire()
+    #         try:
+    #             # copy while holding lock
+    #             blist = self._broker_list[:]
+    #         finally:
+    #             self._cv.release()
+
+    #     agents = []
+    #     for b in blist:
+    #         for idx in range(b.impl.agentCount()):
+    #             agents.append(AgentProxy(b.impl.getAgent(idx), b))
+
+    #     return agents
+    
+    
+    # def get_objects(self, query, kwargs = {}):
+    #     timeout = 30
+    #     agent = None
+    #     temp_args = kwargs.copy()
+    #     if type(query) == type({}):
+    #         temp_args.update(query)
+
+    #     if "_timeout" in temp_args:
+    #         timeout = temp_args["_timeout"]
+    #         temp_args.pop("_timeout")
+
+    #     if "_agent" in temp_args:
+    #         agent = temp_args["_agent"]
+    #         temp_args.pop("_agent")
+
+    #     if type(query) == type({}):
+    #         query = Query(temp_args)
+
+    #     self._select = {}
+    #     for k in temp_args.iterkeys():
+    #         if type(k) == str:
+    #             self._select[k] = temp_args[k]
+
+    #     self._cv.acquire()
+    #     try:
+    #         self._sync_count = 1
+    #         self._sync_result = []
+    #         broker = self._broker_list[0]
+    #         broker.send_query(query.impl, None, agent)
+    #         self._cv.wait(timeout)
+    #         if self._sync_count == 1:
+    #             raise Exception("Timed out: waiting for query response")
+    #     finally:
+    #         self._cv.release()
+
+    #     return self._sync_result
+    
+    
+    # def get_object(self, query, kwargs = {}):
+    #     '''
+    #     Return one and only one object or None.
+    #     '''
+    #     objs = objects(query, kwargs)
+    #     if len(objs) == 1:
+    #         return objs[0]
+    #     else:
+    #         return None
+
+
+    # def first_object(self, query, kwargs = {}):
+    #     '''
+    #     Return the first of potentially many objects.
+    #     '''
+    #     objs = objects(query, kwargs)
+    #     if objs:
+    #         return objs[0]
+    #     else:
+    #         return None
+
+
+    # # Check the object against select to check for a match
+    # def _select_match(self, object):
+    #     schema_props = object.properties()
+    #     for key in self._select.iterkeys():
+    #         for prop in schema_props:
+    #             if key == p[0].name() and self._select[key] != p[1]:
+    #                 return False
+    #     return True
+
+
+    # def _get_result(self, list, context):
+    #     '''
+    #     Called by Broker proxy to return the result of a query.
+    #     '''
+    #     self._cv.acquire()
+    #     try:
+    #         for item in list:
+    #             if self._select_match(item):
+    #                 self._sync_result.append(item)
+    #         self._sync_count -= 1
+    #         self._cv.notify()
+    #     finally:
+    #         self._cv.release()
+
+
+    # def start_sync(self, query): pass
+    
+    
+    # def touch_sync(self, sync): pass
+    
+    
+    # def end_sync(self, sync): pass
+    
+    
+
+
+#     def start_console_events(self):
+#         self._cb_cond.acquire()
+#         try:
+#             self._cb_cond.notify()
+#         finally:
+#             self._cb_cond.release()
+
+
+#     def _do_console_events(self):
+#         '''
+#         Called by the Console thread to poll for events.  Passes the events
+#         onto the ConsoleHandler associated with this Console.  Is called
+#         periodically, but can also be kicked by Console.start_console_events().
+#         '''
+#         count = 0
+#         valid = self.impl.getEvent(self._event)
+#         while valid:
+#             count += 1
+#             try:
+#                 if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
+#                     logging.debug("Console Event AGENT_ADDED received")
+#                     if self._handler:
+#                         self._handler.agent_added(AgentProxy(self._event.agent, None))
+#                 elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
+#                     logging.debug("Console Event AGENT_DELETED received")
+#                     if self._handler:
+#                         self._handler.agent_deleted(AgentProxy(self._event.agent, None))
+#                 elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
+#                     logging.debug("Console Event NEW_PACKAGE received")
+#                     if self._handler:
+#                         self._handler.new_package(self._event.name)
+#                 elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
+#                     logging.debug("Console Event NEW_CLASS received")
+#                     if self._handler:
+#                         self._handler.new_class(SchemaClassKey(self._event.classKey))
+#                 elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
+#                     logging.debug("Console Event OBJECT_UPDATE received")
+#                     if self._handler:
+#                         self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
+#                                                     self._event.hasProps, self._event.hasStats)
+#                 elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
+#                     logging.debug("Console Event EVENT_RECEIVED received")
+#                 elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
+#                     logging.debug("Console Event AGENT_HEARTBEAT received")
+#                     if self._handler:
+#                         self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
+#                 elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
+#                     logging.debug("Console Event METHOD_RESPONSE received")
+#                 else:
+#                     logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
+#             except e:
+#                 print "Exception caught in callback thread:", e
+#             self.impl.popEvent()
+#             valid = self.impl.getEvent(self._event)
+#         return count
+
+
+
+
+
+# class Broker(ConnectionHandler):
+#     #   attr_reader :impl :conn, :console, :broker_bank
+#     def __init__(self, console, conn):
+#         self.broker_bank = 1
+#         self.console = console
+#         self.conn = conn
+#         self._session = None
+#         self._cv = Condition()
+#         self._stable = None
+#         self._event = qmfengine.BrokerEvent()
+#         self._xmtMessage = qmfengine.Message()
+#         self.impl = qmfengine.BrokerProxy(self.console.impl)
+#         self.console.impl.addConnection(self.impl, self)
+#         self.conn.add_conn_handler(self)
+#         self._operational = True
+    
+    
+#     def shutdown(self):
+#         logging.debug("broker.shutdown() called.")
+#         self.console.impl.delConnection(self.impl)
+#         self.conn.del_conn_handler(self)
+#         if self._session:
+#             self.impl.sessionClosed()
+#             logging.debug("broker.shutdown() sessionClosed done.")
+#             self._session.destroy()
+#             logging.debug("broker.shutdown() session destroy done.")
+#             self._session = None
+#         self._operational = False
+#         logging.debug("broker.shutdown() done.")
+
+
+#     def wait_for_stable(self, timeout = None):
+#         self._cv.acquire()
+#         try:
+#             if self._stable:
+#                 return
+#             if timeout:
+#                 self._cv.wait(timeout)
+#                 if not self._stable:
+#                     raise Exception("Timed out: waiting for broker connection to become stable")
+#             else:
+#                 while not self._stable:
+#                     self._cv.wait()
+#         finally:
+#             self._cv.release()
+
+
+#     def send_query(self, query, ctx, agent):
+#         agent_impl = None
+#         if agent:
+#             agent_impl = agent.impl
+#         self.impl.sendQuery(query, ctx, agent_impl)
+#         self.conn.kick()
+
+
+#     def _do_broker_events(self):
+#         count = 0
+#         valid = self.impl.getEvent(self._event)
+#         while valid:
+#             count += 1
+#             if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO:
+#                 logging.debug("Broker Event BROKER_INFO received");
+#             elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE:
+#                 logging.debug("Broker Event DECLARE_QUEUE received");
+#                 self.conn.impl.declareQueue(self._session.handle, self._event.name)
+#             elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE:
+#                 logging.debug("Broker Event DELETE_QUEUE received");
+#                 self.conn.impl.deleteQueue(self._session.handle, self._event.name)
+#             elif self._event.kind == qmfengine.BrokerEvent.BIND:
+#                 logging.debug("Broker Event BIND received");
+#                 self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
+#             elif self._event.kind == qmfengine.BrokerEvent.UNBIND:
+#                 logging.debug("Broker Event UNBIND received");
+#                 self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
+#             elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE:
+#                 logging.debug("Broker Event SETUP_COMPLETE received");
+#                 self.impl.startProtocol()
+#             elif self._event.kind == qmfengine.BrokerEvent.STABLE:
+#                 logging.debug("Broker Event STABLE received");
+#                 self._cv.acquire()
+#                 try:
+#                     self._stable = True
+#                     self._cv.notify()
+#                 finally:
+#                     self._cv.release()
+#             elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE:
+#                 result = []
+#                 for idx in range(self._event.queryResponse.getObjectCount()):
+#                     result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self}))
+#                 self.console._get_result(result, self._event.context)
+#             elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE:
+#                 obj = self._event.context
+#                 obj._method_result(MethodResponse(self._event.methodResponse()))
+            
+#             self.impl.popEvent()
+#             valid = self.impl.getEvent(self._event)
+        
+#         return count
+    
+    
+#     def _do_broker_messages(self):
+#         count = 0
+#         valid = self.impl.getXmtMessage(self._xmtMessage)
+#         while valid:
+#             count += 1
+#             logging.debug("Broker: sending msg on connection")
+#             self.conn.impl.sendMessage(self._session.handle, self._xmtMessage)
+#             self.impl.popXmt()
+#             valid = self.impl.getXmtMessage(self._xmtMessage)
+        
+#         return count
+    
+    
+#     def _do_events(self):
+#         while True:
+#             self.console.start_console_events()
+#             bcnt = self._do_broker_events()
+#             mcnt = self._do_broker_messages()
+#             if bcnt == 0 and mcnt == 0:
+#                 break;
+    
+    
+#     def conn_event_connected(self):
+#         logging.debug("Broker: Connection event CONNECTED")
+#         self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
+#         self.impl.sessionOpened(self._session.handle)
+#         self._do_events()
+    
+    
+#     def conn_event_disconnected(self, error):
+#         logging.debug("Broker: Connection event DISCONNECTED")
+#         pass
+    
+    
+#     def conn_event_visit(self):
+#         self._do_events()
+
+
+#     def sess_event_session_closed(self, context, error):
+#         logging.debug("Broker: Session event CLOSED")
+#         self.impl.sessionClosed()
+    
+    
+#     def sess_event_recv(self, context, message):
+#         logging.debug("Broker: Session event MSG_RECV")
+#         if not self._operational:
+#             logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
+#         self.impl.handleRcvMessage(message)
+#         self._do_events()
+
+
+
+################################################################################
+################################################################################
+################################################################################
+################################################################################
+#                 TEMPORARY TEST CODE - TO BE DELETED
+################################################################################
+################################################################################
+################################################################################
+################################################################################
+
+if __name__ == '__main__':
+    # temp test code
+    from common import (qmfTypes, QmfEvent, SchemaProperty)
+
+    logging.getLogger().setLevel(logging.INFO)
+
+    logging.info( "************* Creating Async Console **************" )
+
+    class MyNotifier(Notifier):
+        def __init__(self, context):
+            self._myContext = context
+            self.WorkAvailable = False
+
+        def indication(self):
+            print("Indication received! context=%d" % self._myContext)
+            self.WorkAvailable = True
+
+    _noteMe = MyNotifier( 666 )
+
+    _myConsole = Console(notifier=_noteMe)
+
+    _myConsole.enable_agent_discovery()
+    logging.info("Waiting...")
+
+
+    logging.info( "Destroying console:" )
+    _myConsole.destroy( 10 )
+
+    logging.info( "******** Messing around with Schema ********" )
+
+    _sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass",
+                                                    stype=SchemaClassId.TYPE_EVENT), 
+                             _desc="A typical event schema",
+                             _props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8,
+                                                                  kwargs = {"min":0,
+                                                                            "max":100,
+                                                                            "unit":"seconds",
+                                                                            "desc":"sleep value"}),
+                                     "Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR,
+                                                                  kwargs={"maxlen":100,
+                                                                          "desc":"a string argument"})})
+    print("_sec=%s" % _sec.get_class_id())
+    print("_sec.gePropertyCount()=%d" % _sec.get_property_count() )
+    print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') )
+    print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') )
+    try:
+        print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') )
+    except:
+        pass
+    print("_sec.getProperties()='%s'" % _sec.get_properties())
+
+    print("Adding another argument")
+    _arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL,
+                            kwargs={"dir":"IO",
+                                    "desc":"a boolean argument"})
+    _sec.add_property('Argument-3', _arg3)
+    print("_sec=%s" % _sec.get_class_id())
+    print("_sec.getPropertyCount()=%d" % _sec.get_property_count() )
+    print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') )
+    print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') )
+    print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') )
+
+    print("_arg3.mapEncode()='%s'" % _arg3.map_encode() )
+
+    _secmap = _sec.map_encode()
+    print("_sec.mapEncode()='%s'" % _secmap )
+
+    _sec2 = SchemaEventClass( _map=_secmap )
+
+    print("_sec=%s" % _sec.get_class_id())
+    print("_sec2=%s" % _sec2.get_class_id())
+
+    _soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage",
+                                                     "_class_name":   "myOtherClass",
+                                                     "_type":         "_data"},
+                                      "_desc": "A test data object",
+                                      "_values":
+                                          {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8,
+                                                     "access": "RO",
+                                                     "index": True,
+                                                     "unit": "degrees"},
+                                           "prop2": {"amqp_type": qmfTypes.TYPE_UINT8,
+                                                     "access": "RW",
+                                                     "index": True,
+                                                     "desc": "The Second Property(tm)",
+                                                     "unit": "radians"},
+                                           "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME,
+                                                           "unit": "seconds",
+                                                           "desc": "time until I retire"},
+                                           "meth1": {"_desc": "A test method",
+                                                     "_arguments":
+                                                         {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
+                                                                   "desc": "an argument 1",
+                                                                   "dir":  "I"},
+                                                          "arg2": {"amqp_type": qmfTypes.TYPE_BOOL,
+                                                                   "dir":  "IO",
+                                                                   "desc": "some weird boolean"}}},
+                                           "meth2": {"_desc": "A test method",
+                                                     "_arguments":
+                                                         {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
+                                                                     "desc": "an 'nuther argument",
+                                                                     "dir":
+                                                                         "I"}}}},
+                                      "_subtypes":
+                                          {"prop1":"qmfProperty",
+                                           "prop2":"qmfProperty",
+                                           "statistics":"qmfProperty",
+                                           "meth1":"qmfMethod",
+                                           "meth2":"qmfMethod"},
+                                      "_primary_key_names": ["prop2", "prop1"]})
+
+    print("_soc='%s'" % _soc)
+
+    print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names())
+
+    print("_soc.getPropertyCount='%d'" % _soc.get_property_count())
+    print("_soc.getProperties='%s'" % _soc.get_properties())
+    print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2'))
+
+    print("_soc.getMethodCount='%d'" % _soc.get_method_count())
+    print("_soc.getMethods='%s'" % _soc.get_methods())
+    print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2'))
+
+    _socmap = _soc.map_encode()
+    print("_socmap='%s'" % _socmap)
+    _soc2 = SchemaObjectClass( _map=_socmap )
+    print("_soc='%s'" % _soc)
+    print("_soc2='%s'" % _soc2)
+
+    if _soc2.get_class_id() == _soc.get_class_id():
+        print("soc and soc2 are the same schema")
+
+
+    logging.info( "******** Messing around with ObjectIds ********" )
+
+
+    qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} )
+    print("qd='%s':" % qd)
+
+    print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4))
+
+    print("qd map='%s'" % qd.map_encode())
+    print("qd getProperty('prop4')='%s'" % qd.get_value("prop4"))
+    qd.set_value("prop4", 4, "A test property called 4")
+    print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4"))
+    qd.prop4 = 9
+    print("qd.prop4 = 9 ='%s'" % qd.prop4)
+    qd["prop4"] = 11
+    print("qd[prop4] = 11 ='%s'" % qd["prop4"])
+
+    print("qd.mapEncode()='%s'" % qd.map_encode())
+    _qd2 = QmfData( _map = qd.map_encode() )
+    print("_qd2.mapEncode()='%s'" % _qd2.map_encode())
+
+    _qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666,
+                                              "prop2": 0}},
+                                agent="some agent name?",
+                                _schema = _soc)
+
+    print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode())
+
+    _qmfDesc1._set_schema( _soc )
+
+    print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2"))
+    print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id())
+    print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id())
+
+
+    _qmfDescMap = _qmfDesc1.map_encode()
+    print("_qmfDescMap='%s'" % _qmfDescMap)
+
+    _qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc )
+
+    print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode())
+    print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2"))
+    print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id())
+
+
+    logging.info( "******** Messing around with QmfEvents ********" )
+
+
+    _qmfevent1 = QmfEvent( _timestamp = 1111,
+                           _schema = _sec,
+                           _values = {"Argument-1": 77, 
+                                      "Argument-3": True,
+                                      "Argument-2": "a string"})
+    print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode())
+    print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp())
+
+    _qmfevent1Map = _qmfevent1.map_encode()
+
+    _qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec)
+    print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode())
+
+
+    logging.info( "******** Messing around with Queries ********" )
+
+    _q1 = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT,
+                                    QmfQueryPredicate({QmfQuery.LOGIC_AND:
+                                                           [{QmfQuery.CMP_EQ: ["vendor",  "AVendor"]},
+                                                            {QmfQuery.CMP_EQ: ["product", "SomeProduct"]},
+                                                            {QmfQuery.CMP_EQ: ["name", "Thingy"]},
+                                                            {QmfQuery.LOGIC_OR:
+                                                                 [{QmfQuery.CMP_LE: ["temperature", -10]},
+                                                                  {QmfQuery.CMP_FALSE: None},
+                                                                  {QmfQuery.CMP_EXISTS: ["namey"]}]}]}))
+
+    print("_q1.mapEncode() = [%s]" % _q1.map_encode())
diff --git a/qpid/python/qmf2/tests/__init__.py b/qpid/python/qmf2/tests/__init__.py
new file mode 100644
index 0000000..2e742b7
--- /dev/null
+++ b/qpid/python/qmf2/tests/__init__.py
@@ -0,0 +1,22 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import agent_discovery, basic_query, basic_method, obj_gets, events
diff --git a/qpid/python/qmf2/tests/agent_discovery.py b/qpid/python/qmf2/tests/agent_discovery.py
new file mode 100644
index 0000000..19ed79c
--- /dev/null
+++ b/qpid/python/qmf2/tests/agent_discovery.py
@@ -0,0 +1,320 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+import qmf2.common
+import qmf2.console
+import qmf2.agent
+
+
+class _testNotifier(qmf.qmfCommon.Notifier):
+    def __init__(self):
+        self._event = Event()
+
+    def indication(self):
+        # note: called by qmf daemon thread
+        self._event.set()
+
+    def wait_for_work(self, timeout):
+        # note: called by application thread to wait
+        # for qmf to generate work
+        self._event.wait(timeout)
+        timed_out = self._event.isSet() == False
+        if not timed_out:
+            self._event.clear()
+            return True
+        return False
+
+
+class _agentApp(Thread):
+    def __init__(self, name, heartbeat):
+        Thread.__init__(self)
+        self.notifier = _testNotifier()
+        self.agent = qmf.qmfAgent.Agent(name,
+                           _notifier=self.notifier,
+                           _heartbeat_interval=heartbeat)
+        # No database needed for this test
+        self.running = True
+        self.start()
+
+    def connect_agent(self, broker_url):
+        # broker_url = "user/passwd@hostname:port"
+        self.conn = qpid.messaging.Connection(broker_url.host,
+                                         broker_url.port,
+                                         broker_url.user,
+                                         broker_url.password)
+        self.conn.connect()
+        self.agent.set_connection(self.conn)
+
+    def disconnect_agent(self, timeout):
+        if self.conn:
+            self.agent.remove_connection(timeout)
+
+    def shutdown_agent(self, timeout):
+        self.agent.destroy(timeout)
+
+    def stop(self):
+        self.running = False
+        self.notifier.indication() # hmmm... collide with daemon???
+        self.join(10)
+        if self.isAlive():
+            logging.error("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+    def run(self):
+        while self.running:
+            self.notifier.wait_for_work(None)
+            wi = self.agent.get_next_workitem(timeout=0)
+            while wi is not None:
+                logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+                self.agent.release_workitem(wi)
+                wi = self.agent.get_next_workitem(timeout=0)
+
+
+
+class BaseTest(unittest.TestCase):
+    def configure(self, config):
+        self.config = config
+        self.broker = config.broker
+        self.defines = self.config.defines
+
+    def setUp(self):
+        # one second agent indication interval
+        self.agent1 = _agentApp("agent1", 1)
+        self.agent1.connect_agent(self.broker)
+        self.agent2 = _agentApp("agent2", 1)
+        self.agent2.connect_agent(self.broker)
+
+    def tearDown(self):
+        if self.agent1:
+            self.agent1.shutdown_agent(10)
+            self.agent1.stop()
+            self.agent1 = None
+        if self.agent2:
+            self.agent2.shutdown_agent(10)
+            self.agent2.stop()
+            self.agent2 = None
+
+    def test_discover_all(self):
+        # create console
+        # enable agent discovery
+        # wait
+        # expect agent add for agent1 and agent2
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+        self.console.enable_agent_discovery()
+
+        agent1_found = agent2_found = False
+        wi = self.console.get_next_workitem(timeout=3)
+        while wi and not (agent1_found and agent2_found):
+            if wi.get_type() == wi.AGENT_ADDED:
+                agent = wi.get_params().get("agent")
+                if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+                    self.fail("Unexpected workitem from agent")
+                else:
+                    if agent.get_name() == "agent1":
+                        agent1_found = True
+                    elif agent.get_name() == "agent2":
+                        agent2_found = True
+                    else:
+                        self.fail("Unexpected agent name received: %s" %
+                                  agent.get_name())
+                    if agent1_found and agent2_found:
+                        break;
+
+            wi = self.console.get_next_workitem(timeout=3)
+
+        self.assertTrue(agent1_found and agent2_found, "All agents not discovered")
+
+        self.console.destroy(10)
+
+
+    def test_discover_one(self):
+        # create console
+        # enable agent discovery, filter for agent1 only
+        # wait until timeout
+        # expect agent add for agent1 only
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        query = qmf.qmfCommon.QmfQuery.create_predicate(
+                           qmf.qmfCommon.QmfQuery.TARGET_AGENT,
+                           qmf.qmfCommon.QmfQueryPredicate({qmf.qmfCommon.QmfQuery.CMP_EQ:
+                                 [qmf.qmfCommon.QmfQuery.KEY_AGENT_NAME, "agent1"]}))
+        self.console.enable_agent_discovery(query)
+
+        agent1_found = agent2_found = False
+        wi = self.console.get_next_workitem(timeout=3)
+        while wi:
+            if wi.get_type() == wi.AGENT_ADDED:
+                agent = wi.get_params().get("agent")
+                if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+                    self.fail("Unexpected workitem from agent")
+                else:
+                    if agent.get_name() == "agent1":
+                        agent1_found = True
+                    elif agent.get_name() == "agent2":
+                        agent2_found = True
+                    else:
+                        self.fail("Unexpected agent name received: %s" %
+                                  agent.get_name())
+
+            wi = self.console.get_next_workitem(timeout=2)
+
+        self.assertTrue(agent1_found and not agent2_found, "Unexpected agent discovered")
+
+        self.console.destroy(10)
+
+
+    def test_heartbeat(self):
+        # create console with 2 sec agent timeout
+        # enable agent discovery, find all agents
+        # stop agent1, expect timeout notification
+        # stop agent2, expect timeout notification
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=2)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+        self.console.enable_agent_discovery()
+
+        agent1_found = agent2_found = False
+        wi = self.console.get_next_workitem(timeout=4)
+        while wi and not (agent1_found and agent2_found):
+            if wi.get_type() == wi.AGENT_ADDED:
+                agent = wi.get_params().get("agent")
+                if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+                    self.fail("Unexpected workitem from agent")
+                else:
+                    if agent.get_name() == "agent1":
+                        agent1_found = True
+                    elif agent.get_name() == "agent2":
+                        agent2_found = True
+                    else:
+                        self.fail("Unexpected agent name received: %s" %
+                                  agent.get_name())
+                    if agent1_found and agent2_found:
+                        break;
+
+            wi = self.console.get_next_workitem(timeout=4)
+
+        self.assertTrue(agent1_found and agent2_found, "All agents not discovered")
+
+        # now kill agent1 and wait for expiration
+
+        agent1 = self.agent1
+        self.agent1 = None
+        agent1.shutdown_agent(10)
+        agent1.stop()
+
+        wi = self.console.get_next_workitem(timeout=4)
+        while wi is not None:
+            if wi.get_type() == wi.AGENT_DELETED:
+                agent = wi.get_params().get("agent")
+                if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+                    self.fail("Unexpected workitem from agent")
+                else:
+                    if agent.get_name() == "agent1":
+                        agent1_found = False
+                    else:
+                        self.fail("Unexpected agent_deleted received: %s" %
+                                  agent.get_name())
+                    if not agent1_found:
+                        break;
+
+            wi = self.console.get_next_workitem(timeout=4)
+
+        self.assertFalse(agent1_found, "agent1 did not delete!")
+
+        # now kill agent2 and wait for expiration
+
+        agent2 = self.agent2
+        self.agent2 = None
+        agent2.shutdown_agent(10)
+        agent2.stop()
+
+        wi = self.console.get_next_workitem(timeout=4)
+        while wi is not None:
+            if wi.get_type() == wi.AGENT_DELETED:
+                agent = wi.get_params().get("agent")
+                if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+                    self.fail("Unexpected workitem from agent")
+                else:
+                    if agent.get_name() == "agent2":
+                        agent2_found = False
+                    else:
+                        self.fail("Unexpected agent_deleted received: %s" %
+                                  agent.get_name())
+                    if not agent2_found:
+                        break;
+
+            wi = self.console.get_next_workitem(timeout=4)
+
+        self.assertFalse(agent2_found, "agent2 did not delete!")
+
+        self.console.destroy(10)
+
+
+    def test_find_agent(self):
+        # create console
+        # do not enable agent discovery
+        # find agent1, expect success
+        # find agent-none, expect failure
+        # find agent2, expect success
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        agent1 = self.console.find_agent("agent1", timeout=3)
+        self.assertTrue(agent1 and agent1.get_name() == "agent1")
+
+        no_agent = self.console.find_agent("agent-none", timeout=3)
+        self.assertTrue(no_agent == None)
+
+        agent2 = self.console.find_agent("agent2", timeout=3)
+        self.assertTrue(agent2 and agent2.get_name() == "agent2")
+
+        self.console.removeConnection(self.conn, 10)
+        self.console.destroy(10)
+
+
diff --git a/qpid/python/qmf2/tests/agent_test.py b/qpid/python/qmf2/tests/agent_test.py
new file mode 100644
index 0000000..14d8ada
--- /dev/null
+++ b/qpid/python/qmf2/tests/agent_test.py
@@ -0,0 +1,167 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import logging
+import time
+import unittest
+from threading import Semaphore
+
+
+from qpid.messaging import *
+from qmf2.common import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData,
+                         QmfEvent, SchemaMethod, Notifier, SchemaClassId,
+                         WorkItem) 
+from qmf2.agent import (Agent, QmfAgentData)
+
+
+
+class ExampleNotifier(Notifier):
+    def __init__(self):
+        self._sema4 = Semaphore(0)   # locked
+
+    def indication(self):
+        self._sema4.release()
+
+    def waitForWork(self):
+        print("Waiting for event...")
+        self._sema4.acquire()
+        print("...event present")
+
+
+
+
+class QmfTest(unittest.TestCase):
+    def test_begin(self):
+        print("!!! being test")
+
+    def test_end(self):
+        print("!!! end test")
+
+
+#
+# An example agent application
+#
+
+
+if __name__ == '__main__':
+    _notifier = ExampleNotifier()
+    _agent = Agent( "qmf.testAgent", _notifier=_notifier )
+
+    # Dynamically construct a class schema
+
+    _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+                                 _desc="A test data schema",
+                                 _object_id_names=["index1", "index2"] )
+    # add properties
+    _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+    _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+    # these two properties are statistics
+    _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+    _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+    # These two properties can be set via the method call
+    _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+    _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+
+    # add method
+    _meth = SchemaMethod( _desc="Method to set string and int in object." )
+    _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+    _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+    _schema.add_method( "set_meth", _meth )
+
+    # Add schema to Agent
+
+    _agent.register_object_class(_schema)
+
+    # instantiate managed data objects matching the schema
+
+    _obj1 = QmfAgentData( _agent, _schema=_schema )
+    _obj1.set_value("index1", 100)
+    _obj1.set_value("index2", "a name" )
+    _obj1.set_value("set_string", "UNSET")
+    _obj1.set_value("set_int", 0)
+    _obj1.set_value("query_count", 0)
+    _obj1.set_value("method_call_count", 0)
+    _agent.add_object( _obj1 )
+
+    _agent.add_object( QmfAgentData( _agent, _schema=_schema,
+                                     _values={"index1":99, 
+                                              "index2": "another name",
+                                              "set_string": "UNSET",
+                                              "set_int": 0,
+                                              "query_count": 0,
+                                              "method_call_count": 0} ))
+
+    # add an "unstructured" object to the Agent
+    _obj2 = QmfAgentData(_agent, _object_id="01545")
+    _obj2.set_value("field1", "a value")
+    _obj2.set_value("field2", 2)
+    _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+    _obj2.set_value("field4", ["a", "list", "value"])
+    _agent.add_object(_obj2)
+
+
+    ## Now connect to the broker
+
+    _c = Connection("localhost")
+    _c.connect()
+    _agent.setConnection(_c)
+
+    _error_data = QmfData.create({"code": -1, "description": "You made a boo-boo."})
+
+    _done = False
+    while not _done:
+        # try:
+        _notifier.waitForWork()
+
+        _wi = _agent.get_next_workitem(timeout=0)
+        while _wi:
+
+            if _wi.get_type() == WorkItem.METHOD_CALL:
+                mc = _wi.get_params()
+            
+                if mc.get_name() == "set_meth":
+                    print("!!! Calling 'set_meth' on Object_id = %s" % mc.get_object_id())
+                    print("!!! args='%s'" % str(mc.get_args()))
+                    print("!!! userid=%s" % str(mc.get_user_id()))
+                    print("!!! handle=%s" % _wi.get_handle())
+                    _agent.method_response(_wi.get_handle(),
+                                           {"rc1": 100, "rc2": "Success"})
+                else:
+                    print("!!! Unknown Method name = %s" % mc.get_name())
+                    _agent.method_response(_wi.get_handle(), _error=_error_data)
+            else:
+                print("TBD: work item %d:%s" % (_wi.get_type(), str(_wi.get_params())))
+
+            _agent.release_workitem(_wi)
+            _wi = _agent.get_next_workitem(timeout=0)
+            #    except:
+            #        print( "shutting down...")
+            #        _done = True
+
+    print( "Removing connection... TBD!!!" )
+    #_myConsole.remove_connection( _c, 10 )
+
+    print( "Destroying agent... TBD!!!" )
+    #_myConsole.destroy( 10 )
+
+    print( "******** agent test done ********" )
+
+
+
diff --git a/qpid/python/qmf2/tests/basic_method.py b/qpid/python/qmf2/tests/basic_method.py
new file mode 100644
index 0000000..c5098b5
--- /dev/null
+++ b/qpid/python/qmf2/tests/basic_method.py
@@ -0,0 +1,348 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+                           SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+                           QmfData, QmfQueryPredicate, WorkItem) 
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent, MethodCallParams)
+
+
+class _testNotifier(Notifier):
+    def __init__(self):
+        self._event = Event()
+
+    def indication(self):
+        # note: called by qmf daemon thread
+        self._event.set()
+
+    def wait_for_work(self, timeout):
+        # note: called by application thread to wait
+        # for qmf to generate work
+        self._event.wait(timeout)
+        timed_out = self._event.isSet() == False
+        if not timed_out:
+            self._event.clear()
+            return True
+        return False
+
+
+class _agentApp(Thread):
+    def __init__(self, name, heartbeat):
+        Thread.__init__(self)
+        self.notifier = _testNotifier()
+        self.agent = Agent(name,
+                           _notifier=self.notifier,
+                           _heartbeat_interval=heartbeat)
+
+        # Dynamically construct a management database
+
+        _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+                                     _desc="A test data schema",
+                                     _object_id_names=["index1", "index2"] )
+        # add properties
+        _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+        _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+        # these two properties are statistics
+        _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+        _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        # These two properties can be set via the method call
+        _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+        _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        # add method
+        _meth = SchemaMethod( _desc="Method to set string and int in object." )
+        _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+        _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+        _schema.add_method( "set_meth", _meth )
+
+        # Add schema to Agent
+
+        self.agent.register_object_class(_schema)
+
+        # instantiate managed data objects matching the schema
+
+        _obj1 = QmfAgentData( self.agent, _schema=_schema )
+        _obj1.set_value("index1", 100)
+        _obj1.set_value("index2", "a name" )
+        _obj1.set_value("set_string", "UNSET")
+        _obj1.set_value("set_int", 0)
+        _obj1.set_value("query_count", 0)
+        _obj1.set_value("method_call_count", 0)
+        self.agent.add_object( _obj1 )
+
+        self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+                                             _values={"index1":99, 
+                                                      "index2": "another name",
+                                                      "set_string": "UNSET",
+                                                      "set_int": 0,
+                                                      "query_count": 0,
+                                                      "method_call_count": 0} ))
+
+        # add an "unstructured" object to the Agent
+        _obj2 = QmfAgentData(self.agent, _object_id="01545")
+        _obj2.set_value("field1", "a value")
+        _obj2.set_value("field2", 2)
+        _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+        _obj2.set_value("field4", ["a", "list", "value"])
+        self.agent.add_object(_obj2)
+
+        self.running = True
+        self.start()
+
+    def connect_agent(self, broker_url):
+        # broker_url = "user/passwd@hostname:port"
+        self.conn = qpid.messaging.Connection(broker_url.host,
+                                         broker_url.port,
+                                         broker_url.user,
+                                         broker_url.password)
+        self.conn.connect()
+        self.agent.set_connection(self.conn)
+
+    def disconnect_agent(self, timeout):
+        if self.conn:
+            self.agent.remove_connection(timeout)
+
+    def shutdown_agent(self, timeout):
+        self.agent.destroy(timeout)
+
+    def stop(self):
+        self.running = False
+        self.notifier.indication() # hmmm... collide with daemon???
+        self.join(10)
+        if self.isAlive():
+            raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+    def run(self):
+        # Agent application main processing loop
+        while self.running:
+            self.notifier.wait_for_work(None)
+            wi = self.agent.get_next_workitem(timeout=0)
+            while wi is not None:
+                if wi.get_type() == WorkItem.METHOD_CALL:
+                    mc = wi.get_params()
+                    if not isinstance(mc, MethodCallParams):
+                        raise Exception("Unexpected method call parameters")
+
+                    if mc.get_name() == "set_meth":
+                        obj = self.agent.get_object(mc.get_object_id())
+                        if obj is None:
+                            error_info = QmfData.create({"code": -2, 
+                                                         "description":
+                                                             "Bad Object Id."})
+                            self.agent.method_response(wi.get_handle(),
+                                                       _error=error_info)
+                        else:
+                            obj.inc_value("method_call_count")
+                            if "arg_int" in mc.get_args():
+                                obj.set_value("set_int", mc.get_args()["arg_int"])
+                            if "arg_str" in mc.get_args():
+                                obj.set_value("set_string", mc.get_args()["arg_str"])
+                            self.agent.method_response(wi.get_handle(),
+                                                       {"code" : 0})
+                    elif mc.get_name() == "a_method":
+                        obj = self.agent.get_object(mc.get_object_id())
+                        if obj is None:
+                            error_info = QmfData.create({"code": -3, 
+                                                         "description":
+                                                             "Unknown object id."})
+                            self.agent.method_response(wi.get_handle(),
+                                                       _error=error_info)
+                        elif obj.get_object_id() != "01545":
+                            error_info = QmfData.create({"code": -4, 
+                                                         "description":
+                                                             "Unexpected id."})
+                            self.agent.method_response(wi.get_handle(),
+                                                       _error=error_info)
+                        else:
+                            args = mc.get_args()
+                            if ("arg1" in args and args["arg1"] == 1 and
+                                "arg2" in args and args["arg2"] == "Now set!"
+                                and "arg3" in args and args["arg3"] == 1966): 
+                                self.agent.method_response(wi.get_handle(),
+                                                           {"code" : 0})
+                            else:
+                                error_info = QmfData.create({"code": -5, 
+                                                             "description":
+                                                                 "Bad Args."})
+                                self.agent.method_response(wi.get_handle(),
+                                                           _error=error_info)
+                    else:
+                        error_info = QmfData.create({"code": -1, 
+                                                     "description":
+                                                         "Unknown method call."})
+                        self.agent.method_response(wi.get_handle(), _error=error_info)
+
+                self.agent.release_workitem(wi)
+                wi = self.agent.get_next_workitem(timeout=0)
+
+
+
+class BaseTest(unittest.TestCase):
+    def configure(self, config):
+        self.config = config
+        self.broker = config.broker
+        self.defines = self.config.defines
+
+    def setUp(self):
+        # one second agent indication interval
+        self.agent1 = _agentApp("agent1", 1)
+        self.agent1.connect_agent(self.broker)
+        self.agent2 = _agentApp("agent2", 1)
+        self.agent2.connect_agent(self.broker)
+
+    def tearDown(self):
+        if self.agent1:
+            self.agent1.shutdown_agent(10)
+            self.agent1.stop()
+            self.agent1 = None
+        if self.agent2:
+            self.agent2.shutdown_agent(10)
+            self.agent2.stop()
+            self.agent2 = None
+
+    def test_described_obj(self):
+        # create console
+        # find agents
+        # synchronous query for all objects in schema
+        # method call on each object
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
+                                              QmfQueryPredicate(
+                    {QmfQuery.LOGIC_AND:
+                     [{QmfQuery.CMP_EXISTS: [SchemaClassId.KEY_PACKAGE]},
+                      {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE,
+                                         "MyPackage"]}]}))
+
+            obj_list = self.console.doQuery(agent, query)
+            self.assertTrue(len(obj_list) == 2)
+            for obj in obj_list:
+                mr = obj.invoke_method( "set_meth", {"arg_int": -99,
+                                                     "arg_str": "Now set!"},
+                                        _timeout=3)
+                self.assertTrue(isinstance(mr, qmf.qmfConsole.MethodResult))
+                self.assertTrue(mr.succeeded())
+                self.assertTrue(mr.get_argument("code") == 0)
+
+                self.assertTrue(obj.get_value("method_call_count") == 0)
+                self.assertTrue(obj.get_value("set_string") == "UNSET")
+                self.assertTrue(obj.get_value("set_int") == 0)
+
+                obj.refresh()
+
+                self.assertTrue(obj.get_value("method_call_count") == 1)
+                self.assertTrue(obj.get_value("set_string") == "Now set!")
+                self.assertTrue(obj.get_value("set_int") == -99)
+
+        self.console.destroy(10)
+
+
+    def test_bad_method(self):
+        # create console
+        # find agents
+        # synchronous query for all objects in schema
+        # invalid method call on each object
+        #  - should throw a ValueError
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
+                                              QmfQueryPredicate(
+                    {QmfQuery.LOGIC_AND:
+                     [{QmfQuery.CMP_EXISTS: [SchemaClassId.KEY_PACKAGE]},
+                      {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE,
+                                         "MyPackage"]}]}))
+
+            obj_list = self.console.doQuery(agent, query)
+            self.assertTrue(len(obj_list) == 2)
+            for obj in obj_list:
+                self.failUnlessRaises(ValueError,
+                                      obj.invoke_method,
+                                      "unknown_meth", 
+                                      {"arg1": -99, "arg2": "Now set!"},
+                                      _timeout=3)
+        self.console.destroy(10)
+
+
+    def test_managed_obj(self):
+        # create console
+        # find agents
+        # synchronous query for a managed object
+        # method call on each object
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "01545")
+            obj_list = self.console.doQuery(agent, query)
+
+            self.assertTrue(isinstance(obj_list, type([])))
+            self.assertTrue(len(obj_list) == 1)
+            obj = obj_list[0]
+
+            mr = obj.invoke_method("a_method",
+                                   {"arg1": 1,
+                                    "arg2": "Now set!",
+                                    "arg3": 1966},
+                                   _timeout=3)
+            self.assertTrue(isinstance(mr, qmf.qmfConsole.MethodResult))
+            self.assertTrue(mr.succeeded())
+            self.assertTrue(mr.get_argument("code") == 0)
+            # @todo refresh and verify changes
+
+        self.console.destroy(10)
diff --git a/qpid/python/qmf2/tests/basic_query.py b/qpid/python/qmf2/tests/basic_query.py
new file mode 100644
index 0000000..46dc87f
--- /dev/null
+++ b/qpid/python/qmf2/tests/basic_query.py
@@ -0,0 +1,336 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+                           SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+                           QmfData, QmfQueryPredicate) 
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+    def __init__(self):
+        self._event = Event()
+
+    def indication(self):
+        # note: called by qmf daemon thread
+        self._event.set()
+
+    def wait_for_work(self, timeout):
+        # note: called by application thread to wait
+        # for qmf to generate work
+        self._event.wait(timeout)
+        timed_out = self._event.isSet() == False
+        if not timed_out:
+            self._event.clear()
+            return True
+        return False
+
+
+class _agentApp(Thread):
+    def __init__(self, name, heartbeat):
+        Thread.__init__(self)
+        self.notifier = _testNotifier()
+        self.agent = Agent(name,
+                           _notifier=self.notifier,
+                           _heartbeat_interval=heartbeat)
+
+        # Dynamically construct a management database
+
+        _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+                                     _desc="A test data schema",
+                                     _object_id_names=["index1", "index2"] )
+        # add properties
+        _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+        _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+        # these two properties are statistics
+        _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+        _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        # These two properties can be set via the method call
+        _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+        _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        # add method
+        _meth = SchemaMethod( _desc="Method to set string and int in object." )
+        _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+        _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+        _schema.add_method( "set_meth", _meth )
+
+        # Add schema to Agent
+
+        self.agent.register_object_class(_schema)
+
+        # instantiate managed data objects matching the schema
+
+        _obj1 = QmfAgentData( self.agent, _schema=_schema )
+        _obj1.set_value("index1", 100)
+        _obj1.set_value("index2", "a name" )
+        _obj1.set_value("set_string", "UNSET")
+        _obj1.set_value("set_int", 0)
+        _obj1.set_value("query_count", 0)
+        _obj1.set_value("method_call_count", 0)
+        self.agent.add_object( _obj1 )
+
+        self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+                                             _values={"index1":99, 
+                                                      "index2": "another name",
+                                                      "set_string": "UNSET",
+                                                      "set_int": 0,
+                                                      "query_count": 0,
+                                                      "method_call_count": 0} ))
+
+        # add an "unstructured" object to the Agent
+        _obj2 = QmfAgentData(self.agent, _object_id="01545")
+        _obj2.set_value("field1", "a value")
+        _obj2.set_value("field2", 2)
+        _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+        _obj2.set_value("field4", ["a", "list", "value"])
+        self.agent.add_object(_obj2)
+
+        self.running = True
+        self.start()
+
+    def connect_agent(self, broker_url):
+        # broker_url = "user/passwd@hostname:port"
+        self.conn = qpid.messaging.Connection(broker_url.host,
+                                         broker_url.port,
+                                         broker_url.user,
+                                         broker_url.password)
+        self.conn.connect()
+        self.agent.set_connection(self.conn)
+
+    def disconnect_agent(self, timeout):
+        if self.conn:
+            self.agent.remove_connection(timeout)
+
+    def shutdown_agent(self, timeout):
+        self.agent.destroy(timeout)
+
+    def stop(self):
+        self.running = False
+        self.notifier.indication() # hmmm... collide with daemon???
+        self.join(10)
+        if self.isAlive():
+            raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+    def run(self):
+        while self.running:
+            self.notifier.wait_for_work(None)
+            wi = self.agent.get_next_workitem(timeout=0)
+            while wi is not None:
+                logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+                self.agent.release_workitem(wi)
+                wi = self.agent.get_next_workitem(timeout=0)
+
+
+
+class BaseTest(unittest.TestCase):
+    def configure(self, config):
+        self.config = config
+        self.broker = config.broker
+        self.defines = self.config.defines
+
+    def setUp(self):
+        # one second agent indication interval
+        self.agent1 = _agentApp("agent1", 1)
+        self.agent1.connect_agent(self.broker)
+        self.agent2 = _agentApp("agent2", 1)
+        self.agent2.connect_agent(self.broker)
+
+    def tearDown(self):
+        if self.agent1:
+            self.agent1.shutdown_agent(10)
+            self.agent1.stop()
+            self.agent1 = None
+        if self.agent2:
+            self.agent2.shutdown_agent(10)
+            self.agent2.stop()
+            self.agent2 = None
+
+    def test_all_oids(self):
+        # create console
+        # find agents
+        # synchronous query for all objects by id
+        # verify known object ids are returned
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID)
+            oid_list = self.console.doQuery(agent, query)
+
+            self.assertTrue(isinstance(oid_list, type([])), 
+                            "Unexpected return type")
+            self.assertTrue(len(oid_list) == 3, "Wrong count")
+            self.assertTrue('100a name' in oid_list)
+            self.assertTrue('99another name' in oid_list)
+            self.assertTrue('01545' in oid_list)
+
+        self.console.destroy(10)
+
+
+    def test_direct_oids(self):
+        # create console
+        # find agents
+        # synchronous query for each objects
+        # verify objects and schemas are correct
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            for oid in ['100a name', '99another name', '01545']:
+                query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, oid)
+                obj_list = self.console.doQuery(agent, query)
+
+                self.assertTrue(isinstance(obj_list, type([])), 
+                                "Unexpected return type")
+                self.assertTrue(len(obj_list) == 1)
+                obj = obj_list[0]
+                self.assertTrue(isinstance(obj, QmfData))
+                self.assertTrue(obj.get_object_id() == oid)
+
+                if obj.is_described():
+                    self.assertTrue(oid in ['100a name', '99another name'])
+                    schema_id = obj.get_schema_class_id()
+                    self.assertTrue(isinstance(schema_id, SchemaClassId))
+                else:
+                    self.assertTrue(oid == "01545")
+
+
+
+        self.console.destroy(10)
+
+
+
+    def test_packages(self):
+        # create console
+        # find agents
+        # synchronous query all package names
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES)
+            package_list = self.console.doQuery(agent, query)
+            self.assertTrue(len(package_list) == 1)
+            self.assertTrue('MyPackage' in package_list)
+
+
+        self.console.destroy(10)
+
+
+
+    def test_predicate_schema_id(self):
+        # create console
+        # find agents
+        # synchronous query for all schema by package name
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA,
+                                              QmfQueryPredicate(
+                    {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, 
+                                       "MyPackage"]}))
+
+            schema_list = self.console.doQuery(agent, query)
+            self.assertTrue(len(schema_list))
+            for schema in schema_list:
+                self.assertTrue(schema.get_class_id().get_package_name() ==
+                                "MyPackage")
+
+
+        self.console.destroy(10)
+
+
+
+    def test_predicate_no_match(self):
+        # create console
+        # find agents
+        # synchronous query for all schema by package name
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA,
+                                              QmfQueryPredicate(
+                    {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, 
+                                       "No-Such-Package"]}))
+
+            schema_list = self.console.doQuery(agent, query)
+            self.assertTrue(len(schema_list) == 0)
+
+        self.console.destroy(10)
+
+
diff --git a/qpid/python/qmf2/tests/console_test.py b/qpid/python/qmf2/tests/console_test.py
new file mode 100644
index 0000000..ac0e064
--- /dev/null
+++ b/qpid/python/qmf2/tests/console_test.py
@@ -0,0 +1,175 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import logging
+import time
+from threading import Semaphore
+
+
+from qpid.messaging import *
+from qmf2.common import (Notifier, QmfQuery, QmfQueryPredicate, MsgKey,
+                       SchemaClassId, SchemaClass, QmfData) 
+from qmf2.console import Console
+
+
+class ExampleNotifier(Notifier):
+    def __init__(self):
+        self._sema4 = Semaphore(0)   # locked
+
+    def indication(self):
+        self._sema4.release()
+
+    def waitForWork(self):
+        print("Waiting for event...")
+        self._sema4.acquire()
+        print("...event present")
+
+
+logging.getLogger().setLevel(logging.INFO)
+
+print( "Starting Connection" )
+_c = Connection("localhost")
+_c.connect()
+
+print( "Starting Console" )
+
+_notifier = ExampleNotifier()
+_myConsole = Console(notifier=_notifier)
+_myConsole.addConnection( _c )
+
+# Allow discovery only for the agent named "qmf.testAgent"
+# @todo: replace "manual" query construction with 
+# a formal class-based Query API
+_query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, 
+                                   QmfQueryPredicate({QmfQuery.CMP_EQ:
+                                                          [QmfQuery.KEY_AGENT_NAME,
+                                                           "qmf.testAgent"]}))
+_myConsole.enable_agent_discovery(_query)
+
+_done = False
+while not _done:
+#    try:
+    _notifier.waitForWork()
+
+    _wi = _myConsole.get_next_workitem(timeout=0)
+    while _wi:
+        print("!!! work item received %d:%s" % (_wi.get_type(),
+                                                str(_wi.get_params())))
+
+
+        if _wi.get_type() == _wi.AGENT_ADDED:
+            _agent = _wi.get_params().get("agent")
+            if not _agent:
+                print("!!!! AGENT IN REPLY IS NULL !!! ")
+
+            _query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID)
+            oid_list = _myConsole.doQuery(_agent, _query)
+
+            print("!!!************************** REPLY=%s" % oid_list)
+
+            for oid in oid_list:
+                _query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, 
+                                            oid)
+                obj_list = _myConsole.doQuery(_agent, _query)
+
+                print("!!!************************** REPLY=%s" % obj_list)
+
+                if obj_list is None:
+                    obj_list={}
+
+                for obj in obj_list:
+                    resp = obj.invoke_method( "set_meth", 
+                                              {"arg_int": -11,
+                                               "arg_str": "are we not goons?"},
+                                              None,
+                                              3)
+                    if resp is None:
+                        print("!!!*** NO RESPONSE FROM METHOD????") 
+                    else:
+                        print("!!! method succeeded()=%s" % resp.succeeded())
+                        print("!!! method exception()=%s" % resp.get_exception())
+                        print("!!! method get args() = %s" % resp.get_arguments())
+
+                    if not obj.is_described():
+                        resp = obj.invoke_method( "bad method", 
+                                                  {"arg_int": -11,
+                                                   "arg_str": "are we not goons?"},
+                                                  None,
+                                                  3)
+                        if resp is None:
+                            print("!!!*** NO RESPONSE FROM METHOD????") 
+                        else:
+                            print("!!! method succeeded()=%s" % resp.succeeded())
+                            print("!!! method exception()=%s" % resp.get_exception())
+                            print("!!! method get args() = %s" % resp.get_arguments())
+
+
+            #---------------------------------
+            #_query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "99another name")
+
+            #obj_list = _myConsole.doQuery(_agent, _query)
+
+            #---------------------------------
+
+            # _query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES)
+
+            # package_list = _myConsole.doQuery(_agent, _query)
+
+            # for pname in package_list:
+            #     print("!!! Querying for schema from package: %s" % pname)
+            #     _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID,
+            #                                        QmfQueryPredicate(
+            #             {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, pname]}))
+
+            #     schema_id_list = _myConsole.doQuery(_agent, _query)
+            #     for sid in schema_id_list:
+            #         _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA,
+            #                                            QmfQueryPredicate(
+            #                 {QmfQuery.CMP_EQ: [SchemaClass.KEY_SCHEMA_ID,
+            #                                    sid.map_encode()]}))
+
+            #         schema_list = _myConsole.doQuery(_agent, _query)
+            #         for schema in schema_list:
+            #             sid = schema.get_class_id()
+            #             _query = QmfQuery.create_predicate(
+            #                 QmfQuery.TARGET_OBJECT_ID,
+            #                 QmfQueryPredicate({QmfQuery.CMP_EQ:
+            #                                        [QmfData.KEY_SCHEMA_ID,
+            #                                         sid.map_encode()]}))
+
+            #             oid_list = _myConsole.doQuery(_agent, _query)
+            #             for oid in oid_list:
+            #                 _query = QmfQuery.create_id(
+            #                     QmfQuery.TARGET_OBJECT, oid)
+            #                 _reply = _myConsole.doQuery(_agent, _query)
+
+            #                 print("!!!************************** REPLY=%s" % _reply)
+
+
+        _myConsole.release_workitem(_wi)
+        _wi = _myConsole.get_next_workitem(timeout=0)
+#    except:
+#        logging.info( "shutting down..." )
+#        _done = True
+
+print( "Removing connection" )
+_myConsole.removeConnection( _c, 10 )
+
+print( "Destroying console:" )
+_myConsole.destroy( 10 )
+
+print( "******** console test done ********" )
diff --git a/qpid/python/qmf2/tests/events.py b/qpid/python/qmf2/tests/events.py
new file mode 100644
index 0000000..8ce534c
--- /dev/null
+++ b/qpid/python/qmf2/tests/events.py
@@ -0,0 +1,193 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import time
+import datetime
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+                         SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+                         QmfData, QmfQueryPredicate, SchemaEventClass,
+                         QmfEvent)
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+    def __init__(self):
+        self._event = Event()
+
+    def indication(self):
+        # note: called by qmf daemon thread
+        self._event.set()
+
+    def wait_for_work(self, timeout):
+        # note: called by application thread to wait
+        # for qmf to generate work
+        self._event.wait(timeout)
+        timed_out = self._event.isSet() == False
+        if not timed_out:
+            self._event.clear()
+            return True
+        return False
+
+
+class _agentApp(Thread):
+    def __init__(self, name, broker_url, heartbeat):
+        Thread.__init__(self)
+        self.timeout = 3
+        self.broker_url = broker_url
+        self.notifier = _testNotifier()
+        self.agent = Agent(name,
+                           _notifier=self.notifier,
+                           _heartbeat_interval=heartbeat)
+
+        # Dynamically construct a management database
+
+        _schema = SchemaEventClass(_classId=SchemaClassId("MyPackage",
+                                                          "MyClass",
+                                                          stype=SchemaClassId.TYPE_EVENT),
+                                   _desc="A test event schema")
+        # add properties
+        _schema.add_property( "prop-1", SchemaProperty(qmfTypes.TYPE_UINT8))
+        _schema.add_property( "prop-2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+        # Add schema to Agent
+        self.schema = _schema
+        self.agent.register_object_class(_schema)
+
+        self.running = False
+
+    def start_app(self):
+        self.running = True
+        self.start()
+
+    def stop_app(self):
+        self.running = False
+        # wake main thread
+        self.notifier.indication() # hmmm... collide with daemon???
+        self.join(self.timeout)
+        if self.isAlive():
+            raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+    def run(self):
+        # broker_url = "user/passwd@hostname:port"
+        conn = qpid.messaging.Connection(self.broker_url.host,
+                                         self.broker_url.port,
+                                         self.broker_url.user,
+                                         self.broker_url.password)
+        conn.connect()
+        self.agent.set_connection(conn)
+
+        counter = 1
+        while self.running:
+            # post an event every second
+            event = QmfEvent.create(long(time.time() * 1000),
+                                    QmfEvent.SEV_WARNING,
+                                    {"prop-1": counter,
+                                     "prop-2": str(datetime.datetime.utcnow())},
+                                    _schema=self.schema)
+            counter += 1
+            self.agent.raise_event(event)
+            wi = self.agent.get_next_workitem(timeout=0)
+            while wi is not None:
+                logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+                self.agent.release_workitem(wi)
+                wi = self.agent.get_next_workitem(timeout=0)
+            self.notifier.wait_for_work(1)
+
+        self.agent.remove_connection(self.timeout)
+        self.agent.destroy(self.timeout)
+
+
+
+class BaseTest(unittest.TestCase):
+    def configure(self, config):
+        self.config = config
+        self.broker = config.broker
+        self.defines = self.config.defines
+
+    def setUp(self):
+        # one second agent indication interval
+        self.agent1 = _agentApp("agent1", self.broker, 1)
+        self.agent1.start_app()
+        self.agent2 = _agentApp("agent2", self.broker, 1)
+        self.agent2.start_app()
+
+    def tearDown(self):
+        if self.agent1:
+            self.agent1.stop_app()
+            self.agent1 = None
+        if self.agent2:
+            self.agent2.stop_app()
+            self.agent2 = None
+
+    def test_get_events(self):
+        # create console
+        # find agents
+
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        # find the agents
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+        # now wait for events
+        agent1_events = agent2_events = 0
+        wi = self.console.get_next_workitem(timeout=4)
+        while wi:
+            if wi.get_type() == wi.EVENT_RECEIVED:
+                event = wi.get_params().get("event")
+                self.assertTrue(isinstance(event, QmfEvent))
+                self.assertTrue(event.get_severity() == QmfEvent.SEV_WARNING)
+                self.assertTrue(event.get_value("prop-1") > 0)
+
+                agent = wi.get_params().get("agent")
+                if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+                    self.fail("Unexpected workitem from agent")
+                else:
+                    if agent.get_name() == "agent1":
+                        agent1_events += 1
+                    elif agent.get_name() == "agent2":
+                        agent2_events += 1
+                    else:
+                        self.fail("Unexpected agent name received: %s" %
+                                  agent.get_name())
+                    if agent1_events and agent2_events:
+                        break;
+
+            wi = self.console.get_next_workitem(timeout=4)
+
+        self.assertTrue(agent1_events > 0 and agent2_events > 0)
+
+        self.console.destroy(10)
+
+
+
+
diff --git a/qpid/python/qmf2/tests/obj_gets.py b/qpid/python/qmf2/tests/obj_gets.py
new file mode 100644
index 0000000..e585754
--- /dev/null
+++ b/qpid/python/qmf2/tests/obj_gets.py
@@ -0,0 +1,399 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+                         SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+                         QmfData, QmfQueryPredicate) 
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+    def __init__(self):
+        self._event = Event()
+
+    def indication(self):
+        # note: called by qmf daemon thread
+        self._event.set()
+
+    def wait_for_work(self, timeout):
+        # note: called by application thread to wait
+        # for qmf to generate work
+        self._event.wait(timeout)
+        timed_out = self._event.isSet() == False
+        if not timed_out:
+            self._event.clear()
+            return True
+        return False
+
+
+class _agentApp(Thread):
+    def __init__(self, name, heartbeat):
+        Thread.__init__(self)
+        self.notifier = _testNotifier()
+        self.agent = Agent(name,
+                           _notifier=self.notifier,
+                           _heartbeat_interval=heartbeat)
+
+        # Management Database 
+        # - two different schema packages, 
+        # - two classes within one schema package
+        # - multiple objects per schema package+class
+        # - two "undescribed" objects
+
+        # "package1/class1"
+
+        _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class1"),
+                                     _desc="A test data schema - one",
+                                     _object_id_names=["key"] )
+
+        _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
+        _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32))
+        _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        self.agent.register_object_class(_schema)
+
+        _obj = QmfAgentData( self.agent, _schema=_schema )
+        _obj.set_value("key", "p1c1_key1")
+        _obj.set_value("count1", 0)
+        _obj.set_value("count2", 0)
+        self.agent.add_object( _obj )
+
+        _obj = QmfAgentData( self.agent, _schema=_schema )
+        _obj.set_value("key", "p1c1_key2")
+        _obj.set_value("count1", 9)
+        _obj.set_value("count2", 10)
+        self.agent.add_object( _obj )
+
+        # "package1/class2"
+
+        _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class2"),
+                                     _desc="A test data schema - two",
+                                     _object_id_names=["name"] )
+        # add properties
+        _schema.add_property( "name", SchemaProperty(qmfTypes.TYPE_LSTR))
+        _schema.add_property( "string1", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+        self.agent.register_object_class(_schema)
+
+        _obj = QmfAgentData( self.agent, _schema=_schema )
+        _obj.set_value("name", "p1c2_name1")
+        _obj.set_value("string1", "a data string")
+        self.agent.add_object( _obj )
+
+
+        # "package2/class1"
+
+        _schema = SchemaObjectClass( _classId=SchemaClassId("package2", "class1"),
+                                     _desc="A test data schema - second package",
+                                     _object_id_names=["key"] )
+
+        _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
+        _schema.add_property( "counter", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        self.agent.register_object_class(_schema)
+
+        _obj = QmfAgentData( self.agent, _schema=_schema )
+        _obj.set_value("key", "p2c1_key1")
+        _obj.set_value("counter", 0)
+        self.agent.add_object( _obj )
+
+        _obj = QmfAgentData( self.agent, _schema=_schema )
+        _obj.set_value("key", "p2c1_key2")
+        _obj.set_value("counter", 2112)
+        self.agent.add_object( _obj )
+
+
+        # add two "unstructured" objects to the Agent
+
+        _obj = QmfAgentData(self.agent, _object_id="undesc-1")
+        _obj.set_value("field1", "a value")
+        _obj.set_value("field2", 2)
+        _obj.set_value("field3", {"a":1, "map":2, "value":3})
+        _obj.set_value("field4", ["a", "list", "value"])
+        self.agent.add_object(_obj)
+
+
+        _obj = QmfAgentData(self.agent, _object_id="undesc-2")
+        _obj.set_value("key-1", "a value")
+        _obj.set_value("key-2", 2)
+        self.agent.add_object(_obj)
+
+        self.running = True
+        self.start()
+
+    def connect_agent(self, broker_url):
+        # broker_url = "user/passwd@hostname:port"
+        self.conn = qpid.messaging.Connection(broker_url.host,
+                                         broker_url.port,
+                                         broker_url.user,
+                                         broker_url.password)
+        self.conn.connect()
+        self.agent.set_connection(self.conn)
+
+    def disconnect_agent(self, timeout):
+        if self.conn:
+            self.agent.remove_connection(timeout)
+
+    def shutdown_agent(self, timeout):
+        self.agent.destroy(timeout)
+
+    def stop(self):
+        self.running = False
+        self.notifier.indication() # hmmm... collide with daemon???
+        self.join(10)
+        if self.isAlive():
+            raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+    def run(self):
+        while self.running:
+            self.notifier.wait_for_work(None)
+            wi = self.agent.get_next_workitem(timeout=0)
+            while wi is not None:
+                logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+                self.agent.release_workitem(wi)
+                wi = self.agent.get_next_workitem(timeout=0)
+
+
+
+class BaseTest(unittest.TestCase):
+    agent_count = 5
+
+    def configure(self, config):
+        self.config = config
+        self.broker = config.broker
+        self.defines = self.config.defines
+
+    def setUp(self):
+        self.agents = []
+        for i in range(self.agent_count):
+            agent = _agentApp("agent-" + str(i), 1)
+            agent.connect_agent(self.broker)
+            self.agents.append(agent)
+
+    def tearDown(self):
+        for agent in self.agents:
+            if agent is not None:
+                agent.shutdown_agent(10)
+                agent.stop()
+
+
+    def test_all_agents(self):
+        # create console
+        # find all agents
+        # synchronous query for all objects by id
+        # verify known object ids are returned
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        for agent_app in self.agents:
+            aname = agent_app.agent.get_name()
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+        # console has discovered all agents, now query all undesc-2 objects
+        objs = self.console.get_objects(_object_id="undesc-2", _timeout=5)
+        self.assertTrue(len(objs) == self.agent_count)
+        for obj in objs:
+            self.assertTrue(obj.get_object_id() == "undesc-2")
+
+        # now query all objects from schema "package1"
+        objs = self.console.get_objects(_pname="package1", _timeout=5)
+        self.assertTrue(len(objs) == (self.agent_count * 3))
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+
+        # now query all objects from schema "package2"
+        objs = self.console.get_objects(_pname="package2", _timeout=5)
+        self.assertTrue(len(objs) == (self.agent_count * 2))
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+
+        # now query all objects from schema "package1/class2"
+        objs = self.console.get_objects(_pname="package1", _cname="class2", _timeout=5)
+        self.assertTrue(len(objs) == self.agent_count)
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+            self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+
+        # given the schema identifier from the last query, repeat using the
+        # specific schema id
+        schema_id = objs[0].get_schema_class_id()
+        objs = self.console.get_objects(_schema_id=schema_id, _timeout=5)
+        self.assertTrue(len(objs) == self.agent_count)
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id() == schema_id)
+
+
+        self.console.destroy(10)
+
+
+
+    def test_agent_subset(self):
+        # create console
+        # find all agents
+        # synchronous query for all objects by id
+        # verify known object ids are returned
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        agent_list = []
+        for agent_app in self.agents:
+            aname = agent_app.agent.get_name()
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+            agent_list.append(agent)
+
+        # Only use a subset of the agents:
+        agent_list = agent_list[:len(agent_list)/2]
+
+        # console has discovered all agents, now query all undesc-2 objects
+        objs = self.console.get_objects(_object_id="undesc-2",
+                                        _agents=agent_list, _timeout=5)
+        self.assertTrue(len(objs) == len(agent_list))
+        for obj in objs:
+            self.assertTrue(obj.get_object_id() == "undesc-2")
+
+        # now query all objects from schema "package1"
+        objs = self.console.get_objects(_pname="package1",
+                                        _agents=agent_list,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == (len(agent_list) * 3))
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+
+        # now query all objects from schema "package2"
+        objs = self.console.get_objects(_pname="package2", 
+                                        _agents=agent_list,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == (len(agent_list) * 2))
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+
+        # now query all objects from schema "package1/class2"
+        objs = self.console.get_objects(_pname="package1", _cname="class2", 
+                                        _agents=agent_list,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == len(agent_list))
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+            self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+
+        # given the schema identifier from the last query, repeat using the
+        # specific schema id
+        schema_id = objs[0].get_schema_class_id()
+        objs = self.console.get_objects(_schema_id=schema_id, 
+                                        _agents=agent_list,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == len(agent_list))
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id() == schema_id)
+
+
+        self.console.destroy(10)
+
+
+
+    def test_single_agent(self):
+        # create console
+        # find all agents
+        # synchronous query for all objects by id
+        # verify known object ids are returned
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        agent_list = []
+        for agent_app in self.agents:
+            aname = agent_app.agent.get_name()
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+            agent_list.append(agent)
+
+        # Only use one agetn
+        agent = agent_list[0]
+
+        # console has discovered all agents, now query all undesc-2 objects
+        objs = self.console.get_objects(_object_id="undesc-2",
+                                        _agents=agent, _timeout=5)
+        self.assertTrue(len(objs) == 1)
+        for obj in objs:
+            self.assertTrue(obj.get_object_id() == "undesc-2")
+
+        # now query all objects from schema "package1"
+        objs = self.console.get_objects(_pname="package1",
+                                        _agents=agent,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == 3)
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+
+        # now query all objects from schema "package2"
+        objs = self.console.get_objects(_pname="package2", 
+                                        _agents=agent,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == 2)
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+
+        # now query all objects from schema "package1/class2"
+        objs = self.console.get_objects(_pname="package1", _cname="class2", 
+                                        _agents=agent,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == 1)
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+            self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+
+        # given the schema identifier from the last query, repeat using the
+        # specific schema id
+        schema_id = objs[0].get_schema_class_id()
+        objs = self.console.get_objects(_schema_id=schema_id, 
+                                        _agents=agent,
+                                        _timeout=5)
+        self.assertTrue(len(objs) == 1)
+        for obj in objs:
+            self.assertTrue(obj.get_schema_class_id() == schema_id)
+
+
+        self.console.destroy(10)
+