| #!/usr/bin/env python |
| |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil |
| from qpid.messaging import Message, NotFound, ConnectionError, Connection |
| from brokertest import * |
| from threading import Thread, Lock, Condition |
| from logging import getLogger, WARN, ERROR, DEBUG |
| |
| |
| log = getLogger("qpid.ha-tests") |
| |
| class HaBroker(Broker): |
| def __init__(self, test, args=[], broker_url=None, **kwargs): |
| assert BrokerTest.ha_lib, "Cannot locate HA plug-in" |
| args=["--load-module", BrokerTest.ha_lib, |
| # FIXME aconway 2012-02-13: workaround slow link failover. |
| "--link-maintenace-interval=0.1", |
| "--ha-enable=yes"] |
| if broker_url: args += [ "--ha-broker-url", broker_url ] |
| Broker.__init__(self, test, args, **kwargs) |
| |
| def promote(self): |
| assert os.system("qpid-ha-tool --promote %s"%(self.host_port())) == 0 |
| |
| def set_client_url(self, url): |
| assert os.system( |
| "qpid-ha-tool --client-addresses=%s %s"%(url,self.host_port())) == 0 |
| |
| def set_broker_url(self, url): |
| assert os.system( |
| "qpid-ha-tool --broker-addresses=%s %s"%(url, self.host_port())) == 0 |
| |
| |
| class ShortTests(BrokerTest): |
| """Short HA functionality tests.""" |
| |
| # Wait for an address to become valid. |
| def wait(self, session, address): |
| def check(): |
| try: |
| session.sender(address) |
| return True |
| except NotFound: return False |
| assert retry(check), "Timed out waiting for %s"%(address) |
| |
| # Wait for address to become valid on a backup broker. |
| def wait_backup(self, backup, address): |
| bs = self.connect_admin(backup).session() |
| self.wait(bs, address) |
| bs.connection.close() |
| |
| # Combines wait_backup and assert_browse_retry |
| def assert_browse_backup(self, backup, queue, expected, **kwargs): |
| bs = self.connect_admin(backup).session() |
| self.wait(bs, queue) |
| self.assert_browse_retry(bs, queue, expected, **kwargs) |
| bs.connection.close() |
| |
| def assert_missing(self, session, address): |
| try: |
| session.receiver(address) |
| self.fail("Should not have been replicated: %s"%(address)) |
| except NotFound: pass |
| |
| def connect_admin(self, backup, **kwargs): |
| """Connect to a backup broker as an admin connection""" |
| return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs) |
| |
| def test_replication(self): |
| """Test basic replication of configuration and messages before and |
| after backup has connected""" |
| |
| def queue(name, replicate): |
| return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate) |
| |
| def exchange(name, replicate, bindq): |
| return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq) |
| def setup(p, prefix, primary): |
| """Create config, send messages on the primary p""" |
| s = p.sender(queue(prefix+"q1", "messages")) |
| for m in ["a", "b", "1"]: s.send(Message(m)) |
| # Test replication of dequeue |
| self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a") |
| p.acknowledge() |
| p.sender(queue(prefix+"q2", "configuration")).send(Message("2")) |
| p.sender(queue(prefix+"q3", "none")).send(Message("3")) |
| p.sender(exchange(prefix+"e1", "messages", prefix+"q1")).send(Message("4")) |
| p.sender(exchange(prefix+"e2", "messages", prefix+"q2")).send(Message("5")) |
| # Test unbind |
| p.sender(queue(prefix+"q4", "messages")).send(Message("6")) |
| s3 = p.sender(exchange(prefix+"e4", "messages", prefix+"q4")) |
| s3.send(Message("7")) |
| # Use old connection to unbind |
| us = primary.connect_old().session(str(qpid.datatypes.uuid4())) |
| us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4") |
| p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped |
| # Need a marker so we can wait till sync is done. |
| p.sender(queue(prefix+"x", "configuration")) |
| |
| def verify(b, prefix, p): |
| """Verify setup was replicated to backup b""" |
| |
| # Wait for configuration to replicate. |
| self.wait(b, prefix+"x"); |
| self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"]) |
| |
| self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b") |
| p.acknowledge() |
| self.assert_browse_retry(b, prefix+"q1", ["1", "4"]) |
| |
| self.assert_browse_retry(b, prefix+"q2", []) # configuration only |
| self.assert_missing(b, prefix+"q3") |
| b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all |
| self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) |
| b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration |
| self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) |
| |
| b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind. |
| self.assert_browse_retry(b, prefix+"q4", ["6","7"]) |
| |
| primary = HaBroker(self, name="primary") |
| primary.promote() |
| p = primary.connect().session() |
| |
| # Create config, send messages before starting the backup, to test catch-up replication. |
| setup(p, "1", primary) |
| backup = HaBroker(self, name="backup", broker_url=primary.host_port()) |
| # Create config, send messages after starting the backup, to test steady-state replication. |
| setup(p, "2", primary) |
| |
| # Verify the data on the backup |
| b = self.connect_admin(backup).session() |
| verify(b, "1", p) |
| verify(b, "2", p) |
| # Test a series of messages, enqueue all then dequeue all. |
| s = p.sender(queue("foo","messages")) |
| self.wait(b, "foo") |
| msgs = [str(i) for i in range(10)] |
| for m in msgs: s.send(Message(m)) |
| self.assert_browse_retry(p, "foo", msgs) |
| self.assert_browse_retry(b, "foo", msgs) |
| r = p.receiver("foo") |
| for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content) |
| p.acknowledge() |
| self.assert_browse_retry(p, "foo", []) |
| self.assert_browse_retry(b, "foo", []) |
| |
| # Another series, this time verify each dequeue individually. |
| for m in msgs: s.send(Message(m)) |
| self.assert_browse_retry(p, "foo", msgs) |
| self.assert_browse_retry(b, "foo", msgs) |
| for i in range(len(msgs)): |
| self.assertEqual(msgs[i], r.fetch(timeout=0).content) |
| p.acknowledge() |
| self.assert_browse_retry(p, "foo", msgs[i+1:]) |
| self.assert_browse_retry(b, "foo", msgs[i+1:]) |
| |
| def qpid_replicate(self, value="messages"): |
| return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value |
| |
| def test_sync(self): |
| def queue(name, replicate): |
| return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate)) |
| primary = HaBroker(self, name="primary") |
| primary.promote() |
| p = primary.connect().session() |
| s = p.sender(queue("q","messages")) |
| for m in [str(i) for i in range(0,10)]: s.send(m) |
| s.sync() |
| backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) |
| for m in [str(i) for i in range(10,20)]: s.send(m) |
| s.sync() |
| backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) |
| for m in [str(i) for i in range(20,30)]: s.send(m) |
| s.sync() |
| |
| msgs = [str(i) for i in range(30)] |
| b1 = self.connect_admin(backup1).session() |
| self.wait(b1, "q"); |
| self.assert_browse_retry(b1, "q", msgs) |
| b2 = self.connect_admin(backup2).session() |
| self.wait(b2, "q"); |
| self.assert_browse_retry(b2, "q", msgs) |
| |
| def test_send_receive(self): |
| """Verify sequence numbers of messages sent by qpid-send""" |
| primary = HaBroker(self, name="primary") |
| primary.promote() |
| backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) |
| backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) |
| sender = self.popen( |
| ["qpid-send", |
| "--broker", primary.host_port(), |
| "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")), |
| "--messages=1000", |
| "--content-string=x" |
| ]) |
| receiver = self.popen( |
| ["qpid-receive", |
| "--broker", primary.host_port(), |
| "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")), |
| "--messages=990", |
| "--timeout=10" |
| ]) |
| try: |
| self.assertEqual(sender.wait(), 0) |
| self.assertEqual(receiver.wait(), 0) |
| expect = [long(i) for i in range(991, 1001)] |
| sn = lambda m: m.properties["sn"] |
| self.assert_browse_retry(self.connect_admin(backup1).session(), "q", expect, transform=sn) |
| self.assert_browse_retry(self.connect_admin(backup2).session(), "q", expect, transform=sn) |
| except: |
| print self.browse(primary.connect().session(), "q", transform=sn) |
| print self.browse(self.connect_admin(backup1).session(), "q", transform=sn) |
| print self.browse(self.connect_admin(backup2).session(), "q", transform=sn) |
| raise |
| |
| def test_failover_python(self): |
| """Verify that backups rejects connections and that fail-over works in python client""" |
| getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages |
| primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) |
| primary.promote() |
| backup = HaBroker(self, name="backup", broker_url=primary.host_port()) |
| # Check that backup rejects normal connections |
| try: |
| backup.connect().session() |
| self.fail("Expected connection to backup to fail") |
| except ConnectionError: pass |
| # Check that admin connections are allowed to backup. |
| self.connect_admin(backup).close() |
| |
| # Test discovery: should connect to primary after reject by backup |
| c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) |
| s = c.session() |
| sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate())) |
| self.wait_backup(backup, "q") |
| sender.send("foo") |
| primary.kill() |
| assert retry(lambda: not is_running(primary.pid)) |
| backup.promote() |
| self.assert_browse_retry(s, "q", ["foo"]) |
| c.close() |
| |
| def test_failover_cpp(self): |
| """Verify that failover works in the C++ client.""" |
| primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) |
| primary.promote() |
| backup = HaBroker(self, name="backup", broker_url=primary.host_port()) |
| url="%s,%s"%(primary.host_port(), backup.host_port()) |
| primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate())) |
| self.wait_backup(backup, "q") |
| |
| sender = NumberedSender(primary, url=url, queue="q", failover_updates = False) |
| receiver = NumberedReceiver(primary, url=url, queue="q", failover_updates = False) |
| receiver.start() |
| sender.start() |
| self.wait_backup(backup, "q") |
| assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru |
| |
| primary.kill() |
| assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die |
| backup.promote() |
| n = receiver.received # Make sure we are still running |
| assert retry(lambda: receiver.received > n + 10) |
| sender.stop() |
| receiver.stop() |
| |
| def test_backup_failover(self): |
| brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) |
| for name in ["a","b","c"] ] |
| url = ",".join([b.host_port() for b in brokers]) |
| for b in brokers: b.set_broker_url(url) |
| brokers[0].promote() |
| brokers[0].connect().session().sender( |
| "q;{create:always,%s}"%(self.qpid_replicate())).send("a") |
| for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"]) |
| brokers[0].kill() |
| brokers[2].promote() # c must fail over to b. |
| brokers[2].connect().session().sender("q").send("b") |
| self.assert_browse_backup(brokers[1], "q", ["a","b"]) |
| for b in brokers[1:]: b.kill() |
| |
| class LongTests(BrokerTest): |
| """Tests that can run for a long time if -DDURATION=<minutes> is set""" |
| |
| def duration(self): |
| d = self.config.defines.get("DURATION") |
| if d: return float(d)*60 |
| else: return 3 # Default is to be quick |
| |
| |
| def disable_test_failover(self): |
| """Test failover with continuous send-receive""" |
| # FIXME aconway 2012-02-03: fails due to dropped messages, |
| # known issue: sending messages to new primary before |
| # backups are ready. |
| |
| # Start a cluster, all members will be killed during the test. |
| brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) |
| for name in ["ha0","ha1","ha2"] ] |
| url = ",".join([b.host_port() for b in brokers]) |
| for b in brokers: b.set_broker_url(url) |
| brokers[0].promote() |
| |
| # Start sender and receiver threads |
| sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False) |
| receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False) |
| receiver.start() |
| sender.start() |
| # Wait for sender & receiver to get up and running |
| assert retry(lambda: receiver.received > 100) |
| # Kill and restart brokers in a cycle: |
| endtime = time.time() + self.duration() |
| i = 0 |
| while time.time() < endtime or i < 3: # At least 3 iterations |
| sender.sender.assert_running() |
| receiver.receiver.assert_running() |
| port = brokers[i].port() |
| brokers[i].kill() |
| brokers.append( |
| HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port, |
| expect=EXPECT_EXIT_FAIL)) |
| i += 1 |
| brokers[i].promote() |
| n = receiver.received # Verify we're still running |
| def enough(): |
| receiver.check() # Verify no exceptions |
| return receiver.received > n + 100 |
| assert retry(enough, timeout=5) |
| |
| sender.stop() |
| receiver.stop() |
| for b in brokers[i:]: b.kill() |
| |
| if __name__ == "__main__": |
| shutil.rmtree("brokertest.tmp", True) |
| os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) |