| #!/usr/bin/env python |
| |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| """ |
| This module contains tests for HA functionality that requires a store. |
| It is not included as part of "make check" since it will not function |
| without a store. Currently it can be run from a build of the message |
| store. |
| """ |
| |
| import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random |
| import traceback |
| from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty |
| from qpid.datatypes import uuid4 |
| from brokertest import * |
| from ha_test import * |
| from threading import Thread, Lock, Condition |
| from logging import getLogger, WARN, ERROR, DEBUG, INFO |
| from qpidtoollibs import BrokerAgent |
| from uuid import UUID |
| |
| |
| class StoreTests(BrokerTest): |
| """Test for HA with persistence.""" |
| |
| def test_store_recovery(self): |
| """Verify basic store and recover functionality""" |
| cluster = HaCluster(self, 2) |
| sn = cluster[0].connect().session() |
| s = sn.sender("qq;{create:always,node:{durable:true}}") |
| sk = sn.sender("xx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:xx,key:k,queue:qq}]}}") |
| s.send(Message("foo", durable=True)) |
| s.send(Message("bar", durable=True)) |
| sk.send(Message("baz", durable=True)) |
| r = cluster[0].connect().session().receiver("qq") |
| self.assertEqual(r.fetch().content, "foo") |
| r.session.acknowledge() |
| # FIXME aconway 2012-09-21: sending this message is an ugly hack to flush |
| # the dequeue operation on qq. |
| s.send(Message("flush", durable=True)) |
| |
| def verify(broker, x_count): |
| sn = broker.connect().session() |
| assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count)*["x"]) |
| sn.sender("xx/k").send(Message("x", durable=True)) |
| assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count+1)*["x"]) |
| |
| verify(cluster[0], 0) |
| cluster.bounce(0, promote_next=False) |
| cluster[0].promote() |
| cluster[0].wait_status("active") |
| verify(cluster[0], 1) |
| cluster.kill(0, promote_next=False) |
| cluster[1].promote() |
| cluster[1].wait_status("active") |
| verify(cluster[1], 2) |
| cluster.bounce(1, promote_next=False) |
| cluster[1].promote() |
| cluster[1].wait_status("active") |
| verify(cluster[1], 3) |
| |
| def test_catchup_store(self): |
| """Verify that a backup erases queue data from store recovery before |
| doing catch-up from the primary.""" |
| cluster = HaCluster(self, 2) |
| sn = cluster[0].connect().session() |
| s1 = sn.sender("q1;{create:always,node:{durable:true}}") |
| for m in ["foo","bar"]: s1.send(Message(m, durable=True)) |
| s2 = sn.sender("q2;{create:always,node:{durable:true}}") |
| sk2 = sn.sender("ex/k2;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:ex,key:k2,queue:q2}]}}") |
| sk2.send(Message("hello", durable=True)) |
| # Wait for backup to catch up. |
| cluster[1].assert_browse_backup("q1", ["foo","bar"]) |
| cluster[1].assert_browse_backup("q2", ["hello"]) |
| |
| # Make changes that the backup doesn't see |
| cluster.kill(1, promote_next=False) |
| time.sleep(1) # FIXME aconway 2012-09-25: |
| r1 = cluster[0].connect().session().receiver("q1") |
| for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m) |
| r1.session.acknowledge() |
| for m in ["x","y","z"]: s1.send(Message(m, durable=True)) |
| # Use old connection to unbind |
| us = cluster[0].connect_old().session(str(uuid4())) |
| us.exchange_unbind(exchange="ex", binding_key="k2", queue="q2") |
| us.exchange_bind(exchange="ex", binding_key="k1", queue="q1") |
| # Restart both brokers from store to get inconsistent sequence numbering. |
| cluster.bounce(0, promote_next=False) |
| cluster[0].promote() |
| cluster[0].wait_status("active") |
| cluster.restart(1) |
| cluster[1].wait_status("ready") |
| |
| # Verify state |
| cluster[0].assert_browse("q1", ["x","y","z"]) |
| cluster[1].assert_browse_backup("q1", ["x","y","z"]) |
| sn = cluster[0].connect().session() # FIXME aconway 2012-09-25: should fail over! |
| sn.sender("ex/k1").send("boo") |
| cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"]) |
| cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"]) |
| sn.sender("ex/k2").send("hoo") # q2 was unbound so this should be dropped. |
| sn.sender("q2").send("end") # mark the end of the queue for assert_browse |
| cluster[0].assert_browse("q2", ["hello", "end"]) |
| cluster[1].assert_browse_backup("q2", ["hello", "end"]) |
| |
| if __name__ == "__main__": |
| shutil.rmtree("brokertest.tmp", True) |
| qpid_ha = os.getenv("QPID_HA_EXEC") |
| if qpid_ha and os.path.exists(qpid_ha): |
| os.execvp("qpid-python-test", |
| ["qpid-python-test", "-m", "ha_store_tests"] + sys.argv[1:]) |
| else: |
| print "Skipping ha_store_tests, %s not available"%(qpid_ha) |