blob: 3c530cc060149344da48d5be5df3a9da5a601d5e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import unittest
import logging
from threading import Thread, Event
import qpid.messaging
import qmf2.common
import qmf2.console
import qmf2.agent
class _testNotifier(qmf2.common.Notifier):
def __init__(self):
self._event = Event()
def indication(self):
# note: called by qmf daemon thread
self._event.set()
def wait_for_work(self, timeout):
# note: called by application thread to wait
# for qmf to generate work
self._event.wait(timeout)
timed_out = self._event.isSet() == False
if not timed_out:
self._event.clear()
return True
return False
class _agentApp(Thread):
def __init__(self, name, heartbeat):
Thread.__init__(self)
self.notifier = _testNotifier()
self.agent = qmf2.agent.Agent(name,
_notifier=self.notifier,
_heartbeat_interval=heartbeat)
# No database needed for this test
self.running = True
self.start()
def connect_agent(self, broker_url):
# broker_url = "user/passwd@hostname:port"
self.conn = qpid.messaging.Connection(broker_url.host,
broker_url.port,
broker_url.user,
broker_url.password)
self.conn.connect()
self.agent.set_connection(self.conn)
def disconnect_agent(self, timeout):
if self.conn:
self.agent.remove_connection(timeout)
def shutdown_agent(self, timeout):
self.agent.destroy(timeout)
def stop(self):
self.running = False
self.notifier.indication() # hmmm... collide with daemon???
self.join(10)
if self.isAlive():
logging.error("AGENT DID NOT TERMINATE AS EXPECTED!!!")
def run(self):
while self.running:
self.notifier.wait_for_work(None)
wi = self.agent.get_next_workitem(timeout=0)
while wi is not None:
logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
self.agent.release_workitem(wi)
wi = self.agent.get_next_workitem(timeout=0)
class BaseTest(unittest.TestCase):
def configure(self, config):
self.config = config
self.broker = config.broker
self.defines = self.config.defines
def setUp(self):
# one second agent indication interval
self.agent1 = _agentApp("agent1", 1)
self.agent1.connect_agent(self.broker)
self.agent2 = _agentApp("agent2", 1)
self.agent2.connect_agent(self.broker)
def tearDown(self):
if self.agent1:
self.agent1.shutdown_agent(10)
self.agent1.stop()
self.agent1 = None
if self.agent2:
self.agent2.shutdown_agent(10)
self.agent2.stop()
self.agent2 = None
def test_discover_all(self):
# create console
# enable agent discovery
# wait
# expect agent add for agent1 and agent2
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier,
agent_timeout=3)
self.conn = qpid.messaging.Connection(self.broker.host,
self.broker.port,
self.broker.user,
self.broker.password)
self.conn.connect()
self.console.addConnection(self.conn)
self.console.enable_agent_discovery()
agent1_found = agent2_found = False
wi = self.console.get_next_workitem(timeout=3)
while wi and not (agent1_found and agent2_found):
if wi.get_type() == wi.AGENT_ADDED:
agent = wi.get_params().get("agent")
if not agent or not isinstance(agent, qmf2.console.Agent):
self.fail("Unexpected workitem from agent")
else:
if agent.get_name() == "agent1":
agent1_found = True
elif agent.get_name() == "agent2":
agent2_found = True
else:
self.fail("Unexpected agent name received: %s" %
agent.get_name())
if agent1_found and agent2_found:
break;
wi = self.console.get_next_workitem(timeout=3)
self.assertTrue(agent1_found and agent2_found, "All agents not discovered")
self.console.destroy(10)
def test_discover_one(self):
# create console
# enable agent discovery, filter for agent1 only
# wait until timeout
# expect agent add for agent1 only
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier,
agent_timeout=3)
self.conn = qpid.messaging.Connection(self.broker.host,
self.broker.port,
self.broker.user,
self.broker.password)
self.conn.connect()
self.console.addConnection(self.conn)
query = qmf2.common.QmfQuery.create_predicate(
qmf2.common.QmfQuery.TARGET_AGENT,
qmf2.common.QmfQueryPredicate({qmf2.common.QmfQuery.CMP_EQ:
[qmf2.common.QmfQuery.KEY_AGENT_NAME, "agent1"]}))
self.console.enable_agent_discovery(query)
agent1_found = agent2_found = False
wi = self.console.get_next_workitem(timeout=3)
while wi:
if wi.get_type() == wi.AGENT_ADDED:
agent = wi.get_params().get("agent")
if not agent or not isinstance(agent, qmf2.console.Agent):
self.fail("Unexpected workitem from agent")
else:
if agent.get_name() == "agent1":
agent1_found = True
elif agent.get_name() == "agent2":
agent2_found = True
else:
self.fail("Unexpected agent name received: %s" %
agent.get_name())
wi = self.console.get_next_workitem(timeout=2)
self.assertTrue(agent1_found and not agent2_found, "Unexpected agent discovered")
self.console.destroy(10)
def test_heartbeat(self):
# create console with 2 sec agent timeout
# enable agent discovery, find all agents
# stop agent1, expect timeout notification
# stop agent2, expect timeout notification
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier,
agent_timeout=2)
self.conn = qpid.messaging.Connection(self.broker.host,
self.broker.port,
self.broker.user,
self.broker.password)
self.conn.connect()
self.console.addConnection(self.conn)
self.console.enable_agent_discovery()
agent1_found = agent2_found = False
wi = self.console.get_next_workitem(timeout=4)
while wi and not (agent1_found and agent2_found):
if wi.get_type() == wi.AGENT_ADDED:
agent = wi.get_params().get("agent")
if not agent or not isinstance(agent, qmf2.console.Agent):
self.fail("Unexpected workitem from agent")
else:
if agent.get_name() == "agent1":
agent1_found = True
elif agent.get_name() == "agent2":
agent2_found = True
else:
self.fail("Unexpected agent name received: %s" %
agent.get_name())
if agent1_found and agent2_found:
break;
wi = self.console.get_next_workitem(timeout=4)
self.assertTrue(agent1_found and agent2_found, "All agents not discovered")
# now kill agent1 and wait for expiration
agent1 = self.agent1
self.agent1 = None
agent1.shutdown_agent(10)
agent1.stop()
wi = self.console.get_next_workitem(timeout=4)
while wi is not None:
if wi.get_type() == wi.AGENT_DELETED:
agent = wi.get_params().get("agent")
if not agent or not isinstance(agent, qmf2.console.Agent):
self.fail("Unexpected workitem from agent")
else:
if agent.get_name() == "agent1":
agent1_found = False
else:
self.fail("Unexpected agent_deleted received: %s" %
agent.get_name())
if not agent1_found:
break;
wi = self.console.get_next_workitem(timeout=4)
self.assertFalse(agent1_found, "agent1 did not delete!")
# now kill agent2 and wait for expiration
agent2 = self.agent2
self.agent2 = None
agent2.shutdown_agent(10)
agent2.stop()
wi = self.console.get_next_workitem(timeout=4)
while wi is not None:
if wi.get_type() == wi.AGENT_DELETED:
agent = wi.get_params().get("agent")
if not agent or not isinstance(agent, qmf2.console.Agent):
self.fail("Unexpected workitem from agent")
else:
if agent.get_name() == "agent2":
agent2_found = False
else:
self.fail("Unexpected agent_deleted received: %s" %
agent.get_name())
if not agent2_found:
break;
wi = self.console.get_next_workitem(timeout=4)
self.assertFalse(agent2_found, "agent2 did not delete!")
self.console.destroy(10)
def test_find_agent(self):
# create console
# do not enable agent discovery
# find agent1, expect success
# find agent-none, expect failure
# find agent2, expect success
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier)
self.conn = qpid.messaging.Connection(self.broker.host,
self.broker.port,
self.broker.user,
self.broker.password)
self.conn.connect()
self.console.addConnection(self.conn)
agent1 = self.console.find_agent("agent1", timeout=3)
self.assertTrue(agent1 and agent1.get_name() == "agent1")
no_agent = self.console.find_agent("agent-none", timeout=3)
self.assertTrue(no_agent == None)
agent2 = self.console.find_agent("agent2", timeout=3)
self.assertTrue(agent2 and agent2.get_name() == "agent2")
self.console.removeConnection(self.conn, 10)
self.console.destroy(10)