blob: d2de384f087559064c49e84dbcc09aa7396659b5 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os, signal, sys, time, imp, re, subprocess, glob, random, logging
import cluster_test_logs
from qpid import datatypes, messaging
from brokertest import *
from qpid.harness import Skipped
from qpid.messaging import Message, Empty, Disposition, REJECTED, util
from threading import Thread, Lock, Condition
from logging import getLogger
from itertools import chain
from tempfile import NamedTemporaryFile
log = getLogger("qpid.cluster_tests")
# Note: brokers that shut themselves down due to critical error during
# normal operation will still have an exit code of 0. Brokers that
# shut down because of an error found during initialize will exit with
# a non-0 code. Hence the apparently inconsistent use of EXPECT_EXIT_OK
# and EXPECT_EXIT_FAIL in some of the tests below.
# TODO aconway 2010-03-11: resolve this - ideally any exit due to an error
# should give non-0 exit status.
# Import scripts as modules
qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
def readfile(filename):
"""Returns te content of file named filename as a string"""
f = file(filename)
try: return f.read()
finally: f.close()
class ShortTests(BrokerTest):
"""Short cluster functionality tests."""
def test_message_replication(self):
"""Test basic cluster message replication."""
# Start a cluster, send some messages to member 0.
cluster = self.cluster(2)
s0 = cluster[0].connect().session()
s0.sender("q; {create:always}").send(Message("x"))
s0.sender("q; {create:always}").send(Message("y"))
s0.connection.close()
# Verify messages available on member 1.
s1 = cluster[1].connect().session()
m = s1.receiver("q", capacity=1).fetch(timeout=1)
s1.acknowledge()
self.assertEqual("x", m.content)
s1.connection.close()
# Start member 2 and verify messages available.
s2 = cluster.start().connect().session()
m = s2.receiver("q", capacity=1).fetch(timeout=1)
s2.acknowledge()
self.assertEqual("y", m.content)
s2.connection.close()
def test_store_direct_update_match(self):
"""Verify that brokers stores an identical message whether they receive it
direct from clients or during an update, no header or other differences"""
cluster = self.cluster(0, args=["--load-module", self.test_store_lib])
cluster.start(args=["--test-store-dump", "direct.dump"])
# Try messages with various headers
cluster[0].send_message("q", Message(durable=True, content="foobar",
subject="subject",
reply_to="reply_to",
properties={"n":10}))
# Try messages of different sizes
for size in range(0,10000,100):
cluster[0].send_message("q", Message(content="x"*size, durable=True))
# Try sending via named exchange
c = cluster[0].connect_old()
s = c.session(str(qpid.datatypes.uuid4()))
s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q")
props = s.delivery_properties(routing_key="foo", delivery_mode=2)
s.message_transfer(
destination="amq.direct",
message=qpid.datatypes.Message(props, "content"))
# Try message with TTL and differnet headers/properties
cluster[0].send_message("q", Message(durable=True, ttl=100000))
cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000))
cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000))
# Now update a new member and compare their dumps.
cluster.start(args=["--test-store-dump", "updatee.dump"])
assert readfile("direct.dump") == readfile("updatee.dump")
os.remove("direct.dump")
os.remove("updatee.dump")
def test_sasl(self):
"""Test SASL authentication and encryption in a cluster"""
sasl_config=os.path.join(self.rootdir, "sasl_config")
acl=os.path.join(os.getcwd(), "policy.acl")
aclf=file(acl,"w")
# Must allow cluster-user (zag) access to credentials exchange.
aclf.write("""
acl allow zag@QPID publish exchange name=qpid.cluster-credentials
acl allow zig@QPID all all
acl deny all all
""")
aclf.close()
cluster = self.cluster(1, args=["--auth", "yes",
"--sasl-config", sasl_config,
"--load-module", os.getenv("ACL_LIB"),
"--acl-file", acl,
"--cluster-username=zag",
"--cluster-password=zag",
"--cluster-mechanism=PLAIN"
])
# Valid user/password, ensure queue is created.
c = cluster[0].connect(username="zig", password="zig")
c.session().sender("ziggy;{create:always,node:{x-declare:{exclusive:true}}}")
c.close()
cluster.start() # Start second node.
# Check queue is created on second node.
c = cluster[1].connect(username="zig", password="zig")
c.session().receiver("ziggy;{assert:always}")
c.close()
for b in cluster: b.ready() # Make sure all brokers still running.
# Valid user, bad password
try:
cluster[0].connect(username="zig", password="foo").close()
self.fail("Expected exception")
except messaging.exceptions.ConnectionError: pass
for b in cluster: b.ready() # Make sure all brokers still running.
# Bad user ID
try:
cluster[0].connect(username="foo", password="bar").close()
self.fail("Expected exception")
except messaging.exceptions.ConnectionError: pass
for b in cluster: b.ready() # Make sure all brokers still running.
# Action disallowed by ACL
c = cluster[0].connect(username="zag", password="zag")
try:
s = c.session()
s.sender("zaggy;{create:always}")
s.close()
self.fail("Expected exception")
except messaging.exceptions.UnauthorizedAccess: pass
# make sure the queue was not created at the other node.
c = cluster[1].connect(username="zig", password="zig")
try:
s = c.session()
s.sender("zaggy;{assert:always}")
s.close()
self.fail("Expected exception")
except messaging.exceptions.NotFound: pass
def test_sasl_join_good(self):
"""Verify SASL authentication between brokers when joining a cluster."""
sasl_config=os.path.join(self.rootdir, "sasl_config")
# Test with a valid username/password
cluster = self.cluster(1, args=["--auth", "yes",
"--sasl-config", sasl_config,
"--cluster-username=zig",
"--cluster-password=zig",
"--cluster-mechanism=PLAIN"
])
cluster.start()
c = cluster[1].connect(username="zag", password="zag", mechanism="PLAIN")
def test_sasl_join_bad_password(self):
# Test with an invalid password
cluster = self.cluster(1, args=["--auth", "yes",
"--sasl-config", os.path.join(self.rootdir, "sasl_config"),
"--cluster-username=zig",
"--cluster-password=bad",
"--cluster-mechanism=PLAIN"
])
cluster.start(wait=False, expect=EXPECT_EXIT_FAIL)
assert cluster[1].log_contains("critical Unexpected error: connection-forced: Authentication failed")
def test_sasl_join_wrong_user(self):
# Test with a valid user that is not the cluster user.
cluster = self.cluster(0, args=["--auth", "yes",
"--sasl-config", os.path.join(self.rootdir, "sasl_config")])
cluster.start(args=["--cluster-username=zig",
"--cluster-password=zig",
"--cluster-mechanism=PLAIN"
])
cluster.start(wait=False, expect=EXPECT_EXIT_FAIL,
args=["--cluster-username=zag",
"--cluster-password=zag",
"--cluster-mechanism=PLAIN"
])
assert cluster[1].log_contains("critical Unexpected error: unauthorized-access: unauthorized-access: Unauthorized user zag@QPID for qpid.cluster-credentials, should be zig")
def test_user_id_update(self):
"""Ensure that user-id of an open session is updated to new cluster members"""
sasl_config=os.path.join(self.rootdir, "sasl_config")
cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config,
"--cluster-mechanism=ANONYMOUS"])
c = cluster[0].connect(username="zig", password="zig")
s = c.session().sender("q;{create:always}")
s.send(Message("x", user_id="zig")) # Message sent before start new broker
cluster.start()
s.send(Message("y", user_id="zig")) # Messsage sent after start of new broker
# Verify brokers are healthy and messages are on the queue.
self.assertEqual("x", cluster[0].get_message("q").content)
self.assertEqual("y", cluster[1].get_message("q").content)
def test_link_events(self):
"""Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543"""
args = ["--mgmt-pub-interval", 1] # Publish management information every second.
broker1 = self.cluster(1, args)[0]
broker2 = self.cluster(1, args)[0]
qp = self.popen(["qpid-printevents", broker1.host_port()], EXPECT_RUNNING)
qr = self.popen(["qpid-route", "route", "add",
broker1.host_port(), broker2.host_port(),
"amq.fanout", "key"
], EXPECT_EXIT_OK)
# Look for link event in printevents output.
retry(lambda: find_in_file("brokerLinkUp", qp.outfile("out")))
broker1.ready()
broker2.ready()
def test_queue_cleaner(self):
""" Regression test to ensure that cleanup of expired messages works correctly """
cluster = self.cluster(2, args=["--queue-purge-interval", 3])
s0 = cluster[0].connect().session()
sender = s0.sender("my-lvq; {create: always, node:{x-declare:{arguments:{'qpid.last_value_queue':1}}}}")
#send 10 messages that will all expire and be cleaned up
for i in range(1, 10):
msg = Message("message-%s" % i)
msg.properties["qpid.LVQ_key"] = "a"
msg.ttl = 0.1
sender.send(msg)
#wait for queue cleaner to run
time.sleep(3)
#test all is ok by sending and receiving a message
msg = Message("non-expiring")
msg.properties["qpid.LVQ_key"] = "b"
sender.send(msg)
s0.connection.close()
s1 = cluster[1].connect().session()
m = s1.receiver("my-lvq", capacity=1).fetch(timeout=1)
s1.acknowledge()
self.assertEqual("non-expiring", m.content)
s1.connection.close()
for b in cluster: b.ready() # Make sure all brokers still running.
def test_amqfailover_visible(self):
"""Verify that the amq.failover exchange can be seen by
QMF-based tools - regression test for BZ615300."""
broker1 = self.cluster(1)[0]
broker2 = self.cluster(1)[0]
qs = subprocess.Popen(["qpid-stat", "-e", broker1.host_port()], stdout=subprocess.PIPE)
out = qs.communicate()[0]
assert out.find("amq.failover") > 0
def evaluate_address(self, session, address):
"""Create a receiver just to evaluate an address for its side effects"""
r = session.receiver(address)
r.close()
def test_expire_fanout(self):
"""Regression test for QPID-2874: Clustered broker crashes in assertion in
cluster/ExpiryPolicy.cpp.
Caused by a fan-out message being updated as separate messages"""
cluster = self.cluster(1)
session0 = cluster[0].connect().session()
# Create 2 queues bound to fanout exchange.
self.evaluate_address(session0, "q1;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q1}]}}")
self.evaluate_address(session0, "q2;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q2}]}}")
queues = ["q1", "q2"]
# Send a fanout message with a long timeout
s = session0.sender("amq.fanout")
s.send(Message("foo", ttl=100), sync=False)
# Start a new member, check the messages
cluster.start()
session1 = cluster[1].connect().session()
for q in queues: self.assert_browse(session1, "q1", ["foo"])
def test_route_update(self):
"""Regression test for https://issues.apache.org/jira/browse/QPID-2982
Links and bridges associated with routes were not replicated on update.
This meant extra management objects and caused an exit if a management
client was attached.
"""
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
# First broker will be killed.
cluster0 = self.cluster(1, args=args)
cluster1 = self.cluster(1, args=args)
assert 0 == subprocess.call(
["qpid-route", "route", "add", cluster0[0].host_port(),
cluster1[0].host_port(), "dummy-exchange", "dummy-key", "-d"])
cluster0.start()
# Wait for qpid-tool:list on cluster0[0] to generate expected output.
pattern = re.compile("org.apache.qpid.broker.*link")
qpid_tool = subprocess.Popen(["qpid-tool", cluster0[0].host_port()],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
class Scanner(Thread):
def __init__(self): self.found = False; Thread.__init__(self)
def run(self):
for l in qpid_tool.stdout:
if pattern.search(l): self.found = True; return
scanner = Scanner()
scanner.start()
start = time.time()
try:
# Wait up to 5 second timeout for scanner to find expected output
while not scanner.found and time.time() < start + 5:
qpid_tool.stdin.write("list\n") # Ask qpid-tool to list
for b in cluster0: b.ready() # Raise if any brokers are down
finally:
qpid_tool.stdin.write("quit\n")
qpid_tool.wait()
scanner.join()
assert scanner.found
# Regression test for https://issues.apache.org/jira/browse/QPID-3235
# Inconsistent stats when changing elder.
# Force a change of elder
cluster0.start()
cluster0[0].expect=EXPECT_EXIT_FAIL # About to die.
cluster0[0].kill()
time.sleep(2) # Allow a management interval to pass.
# Verify logs are consistent
cluster_test_logs.verify_logs()
def test_redelivered(self):
"""Verify that redelivered flag is set correctly on replayed messages"""
cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port())
queue = "my-queue"
cluster[0].declare_queue(queue)
self.sender = self.popen(
["qpid-send",
"--broker", url,
"--address", queue,
"--sequence=true",
"--send-eos=1",
"--messages=100000",
"--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS)
])
self.receiver = self.popen(
["qpid-receive",
"--broker", url,
"--address", queue,
"--ignore-duplicates",
"--check-redelivered",
"--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
"--forever"
])
time.sleep(1)#give sender enough time to have some messages to replay
cluster[0].kill()
self.sender.wait()
self.receiver.wait()
cluster[1].kill()
class BlockedSend(Thread):
"""Send a message, send is expected to block.
Verify that it does block (for a given timeout), then allow
waiting till it unblocks when it is expected to do so."""
def __init__(self, sender, msg):
self.sender, self.msg = sender, msg
self.blocked = True
self.condition = Condition()
self.timeout = 0.1 # Time to wait for expected results.
Thread.__init__(self)
def run(self):
try:
self.sender.send(self.msg, sync=True)
self.condition.acquire()
try:
self.blocked = False
self.condition.notify()
finally: self.condition.release()
except Exception,e: print "BlockedSend exception: %s"%e
def start(self):
Thread.start(self)
time.sleep(self.timeout)
assert self.blocked # Expected to block
def assert_blocked(self): assert self.blocked
def wait(self): # Now expecting to unblock
self.condition.acquire()
try:
while self.blocked:
self.condition.wait(self.timeout)
if self.blocked: raise Exception("Timed out waiting for send to unblock")
finally: self.condition.release()
self.join()
def queue_flowlimit_test(self, brokers):
"""Verify that the queue's flowlimit configuration and state are
correctly replicated.
The brokers argument allows this test to run on single broker,
cluster of 2 pre-startd brokers or cluster where second broker
starts after queue is in flow control.
"""
# configure a queue with a specific flow limit on first broker
ssn0 = brokers.first().connect().session()
s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
brokers.first().startQmf()
q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
oid = q1.getObjectId()
self.assertEqual(q1.name, "flq")
self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
assert not q1.flowStopped
self.assertEqual(q1.flowStoppedCount, 0)
# fill the queue on one broker until flow control is active
for x in range(5): s0.send(Message(str(x)))
sender = ShortTests.BlockedSend(s0, Message(str(6)))
sender.start() # Tests that sender does block
# Verify the broker queue goes into a flowStopped state
deadline = time.time() + 1
while not q1.flowStopped and time.time() < deadline: q1.update()
assert q1.flowStopped
self.assertEqual(q1.flowStoppedCount, 1)
sender.assert_blocked() # Still blocked
# Now verify the both brokers in cluster have same configuration
brokers.second().startQmf()
qs = brokers.second().qmf_session.getObjects(_objectId=oid)
self.assertEqual(len(qs), 1)
q2 = qs[0]
self.assertEqual(q2.name, "flq")
self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
assert q2.flowStopped
self.assertEqual(q2.flowStoppedCount, 1)
# now drain the queue using a session to the other broker
ssn1 = brokers.second().connect().session()
r1 = ssn1.receiver("flq", capacity=6)
for x in range(4):
r1.fetch(timeout=0)
ssn1.acknowledge()
sender.wait() # Verify no longer blocked.
# and re-verify state of queue on both brokers
q1.update()
assert not q1.flowStopped
q2.update()
assert not q2.flowStopped
ssn0.connection.close()
ssn1.connection.close()
cluster_test_logs.verify_logs()
def test_queue_flowlimit(self):
"""Test flow limits on a standalone broker"""
broker = self.broker()
class Brokers:
def first(self): return broker
def second(self): return broker
self.queue_flowlimit_test(Brokers())
def test_queue_flowlimit_cluster(self):
cluster = self.cluster(2)
class Brokers:
def first(self): return cluster[0]
def second(self): return cluster[1]
self.queue_flowlimit_test(Brokers())
def test_queue_flowlimit_cluster_join(self):
cluster = self.cluster(1)
class Brokers:
def first(self): return cluster[0]
def second(self):
if len(cluster) == 1: cluster.start()
return cluster[1]
self.queue_flowlimit_test(Brokers())
def test_queue_flowlimit_replicate(self):
""" Verify that a queue which is in flow control BUT has drained BELOW
the flow control 'stop' threshold, is correctly replicated when a new
broker is added to the cluster.
"""
class AsyncSender(Thread):
"""Send a fixed number of msgs from a sender in a separate thread
so it may block without blocking the test.
"""
def __init__(self, broker, address, count=1, size=4):
Thread.__init__(self)
self.daemon = True
self.broker = broker
self.queue = address
self.count = count
self.size = size
self.done = False
def run(self):
self.sender = subprocess.Popen(["qpid-send",
"--capacity=1",
"--content-size=%s" % self.size,
"--messages=%s" % self.count,
"--failover-updates",
"--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
"--address=%s" % self.queue,
"--broker=%s" % self.broker.host_port()])
self.sender.wait()
self.done = True
cluster = self.cluster(2)
# create a queue with rather draconian flow control settings
ssn0 = cluster[0].connect().session()
s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}")
# fire off the sending thread to broker[0], and wait until the queue
# hits flow control on broker[1]
sender = AsyncSender(cluster[0], "flq", count=110);
sender.start();
cluster[1].startQmf()
q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
deadline = time.time() + 10
while not q_obj.flowStopped and time.time() < deadline:
q_obj.update()
assert q_obj.flowStopped
assert not sender.done
assert q_obj.msgDepth < 110
# Now drain enough messages on broker[1] to drop below the flow stop
# threshold, but not relieve flow control...
receiver = subprocess.Popen(["qpid-receive",
"--messages=15",
"--timeout=1",
"--print-content=no",
"--failover-updates",
"--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
"--ack-frequency=1",
"--address=flq",
"--broker=%s" % cluster[1].host_port()])
receiver.wait()
q_obj.update()
assert q_obj.flowStopped
assert not sender.done
current_depth = q_obj.msgDepth
# add a new broker to the cluster, and verify that the queue is in flow
# control on that broker
cluster.start()
cluster[2].startQmf()
q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
assert q_obj.flowStopped
assert q_obj.msgDepth == current_depth
# now drain the queue on broker[2], and verify that the sender becomes
# unblocked
receiver = subprocess.Popen(["qpid-receive",
"--messages=95",
"--timeout=1",
"--print-content=no",
"--failover-updates",
"--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
"--ack-frequency=1",
"--address=flq",
"--broker=%s" % cluster[2].host_port()])
receiver.wait()
q_obj.update()
assert not q_obj.flowStopped
self.assertEqual(q_obj.msgDepth, 0)
# verify that the sender has become unblocked
sender.join(timeout=5)
assert not sender.isAlive()
assert sender.done
def test_blocked_queue_delete(self):
"""Verify that producers which are blocked on a queue due to flow
control are unblocked when that queue is deleted.
"""
cluster = self.cluster(2)
cluster[0].startQmf()
cluster[1].startQmf()
# configure a queue with a specific flow limit on first broker
ssn0 = cluster[0].connect().session()
s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
oid = q1.getObjectId()
self.assertEqual(q1.name, "flq")
self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
assert not q1.flowStopped
self.assertEqual(q1.flowStoppedCount, 0)
# fill the queue on one broker until flow control is active
for x in range(5): s0.send(Message(str(x)))
sender = ShortTests.BlockedSend(s0, Message(str(6)))
sender.start() # Tests that sender does block
# Verify the broker queue goes into a flowStopped state
deadline = time.time() + 1
while not q1.flowStopped and time.time() < deadline: q1.update()
assert q1.flowStopped
self.assertEqual(q1.flowStoppedCount, 1)
sender.assert_blocked() # Still blocked
# Now verify the both brokers in cluster have same configuration
qs = cluster[1].qmf_session.getObjects(_objectId=oid)
self.assertEqual(len(qs), 1)
q2 = qs[0]
self.assertEqual(q2.name, "flq")
self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
assert q2.flowStopped
self.assertEqual(q2.flowStoppedCount, 1)
# now delete the blocked queue from other broker
ssn1 = cluster[1].connect().session()
self.evaluate_address(ssn1, "flq;{delete:always}")
sender.wait() # Verify no longer blocked.
ssn0.connection.close()
ssn1.connection.close()
cluster_test_logs.verify_logs()
def test_alternate_exchange_update(self):
"""Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """
cluster = self.cluster(1)
s0 = cluster[0].connect().session()
# create alt queue bound to amq.fanout exchange, will be destination for alternate exchanges
self.evaluate_address(s0, "alt;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:alt}]}}")
# create direct exchange ex with alternate-exchange amq.fanout and no queues bound
self.evaluate_address(s0, "ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'amq.fanout'}}}")
# create queue q with alternate-exchange amq.fanout
self.evaluate_address(s0, "q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'amq.fanout'}}}")
def verify(broker):
s = broker.connect().session()
# Verify unmatched message goes to ex's alternate.
s.sender("ex").send("foo")
self.assertEqual("foo", s.receiver("alt").fetch(timeout=0).content)
# Verify rejected message goes to q's alternate.
s.sender("q").send("bar")
msg = s.receiver("q").fetch(timeout=0)
self.assertEqual("bar", msg.content)
s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
self.assertEqual("bar", s.receiver("alt").fetch(timeout=0).content)
verify(cluster[0])
cluster.start()
verify(cluster[1])
def test_binding_order(self):
"""Regression test for binding order inconsistency in cluster"""
cluster = self.cluster(1)
c0 = cluster[0].connect()
s0 = c0.session()
# Declare multiple queues bound to same key on amq.topic
def declare(q,max=0):
if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d, "qpid.flow_stop_count":0}}'%max
else: declare = 'x-declare:{}'
bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q)
s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind))
declare('d',max=4) # Only one with a limit
for q in ['c', 'b','a']: declare(q)
# Add a cluster member, send enough messages to exceed the max count
cluster.start()
try:
s = s0.sender('amq.topic/key')
for m in xrange(1,6): s.send(Message(str(m)))
self.fail("Expected capacity exceeded exception")
except messaging.exceptions.TargetCapacityExceeded: pass
c1 = cluster[1].connect()
s1 = c1.session()
s0 = c0.session() # Old session s0 is broken by exception.
# Verify queue contents are consistent.
for q in ['a','b','c','d']:
self.assertEqual(self.browse(s0, q), self.browse(s1, q))
# Verify queue contents are "best effort"
for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)])
self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)])
def test_deleted_exchange(self):
"""QPID-3215: cached exchange reference can cause cluster inconsistencies
if exchange is deleted/recreated
Verify stand-alone case
"""
cluster = self.cluster()
# Verify we do not route message via an exchange that has been destroyed.
cluster.start()
s0 = cluster[0].connect().session()
self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
send0 = s0.sender("ex/foo")
send0.send("foo")
self.assert_browse(s0, "q", ["foo"])
self.evaluate_address(s0, "ex;{delete:always}")
try:
send0.send("bar") # Should fail, exchange is deleted.
self.fail("Expected not-found exception")
except qpid.messaging.NotFound: pass
self.assert_browse(cluster[0].connect().session(), "q", ["foo"])
def test_deleted_exchange_inconsistent(self):
"""QPID-3215: cached exchange reference can cause cluster inconsistencies
if exchange is deleted/recreated
Verify cluster inconsistency.
"""
cluster = self.cluster()
cluster.start()
s0 = cluster[0].connect().session()
self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
send0 = s0.sender("ex/foo")
send0.send("foo")
self.assert_browse(s0, "q", ["foo"])
cluster.start()
s1 = cluster[1].connect().session()
self.evaluate_address(s0, "ex;{delete:always}")
try:
send0.send("bar")
self.fail("Expected not-found exception")
except qpid.messaging.NotFound: pass
self.assert_browse(s1, "q", ["foo"])
def test_ttl_consistent(self):
"""Ensure we don't get inconsistent errors with message that have TTL very close together"""
messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)]
messages.append(Message("x"))
cluster = self.cluster(2)
sender = cluster[0].connect().session().sender("q;{create:always}")
def fetch(b):
receiver = b.connect().session().receiver("q;{create:always}")
while receiver.fetch().content != "x": pass
for m in messages: sender.send(m, sync=False)
for m in messages: sender.send(m, sync=False)
fetch(cluster[0])
fetch(cluster[1])
for m in messages: sender.send(m, sync=False)
cluster.start()
fetch(cluster[2])
# Some utility code for transaction tests
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2
XA_OK = 0
dtx_branch_counter = 0
class DtxStatusException(Exception):
def __init__(self, expect, actual):
self.expect = expect
self.actual = actual
def str(self):
return "DtxStatusException(expect=%s, actual=%s)"%(self.expect, self.actual)
class DtxTestFixture:
"""Bundle together some common requirements for dtx tests."""
def __init__(self, test, broker, name, exclusive=False):
self.test = test
self.broker = broker
self.name = name
# Use old API. DTX is not supported in messaging API.
self.connection = broker.connect_old()
self.session = self.connection.session(name, 1) # 1 second timeout
self.queue = self.session.queue_declare(name, exclusive=exclusive)
self.session.dtx_select()
self.consumer = None
def xid(self, id=None):
if id is None: id = self.name
return self.session.xid(format=0, global_id=id)
def check_status(self, expect, actual):
if expect != actual: raise DtxStatusException(expect, actual)
def start(self, id=None, resume=False):
self.check_status(XA_OK, self.session.dtx_start(xid=self.xid(id), resume=resume).status)
def end(self, id=None, suspend=False):
self.check_status(XA_OK, self.session.dtx_end(xid=self.xid(id), suspend=suspend).status)
def prepare(self, id=None):
self.check_status(XA_OK, self.session.dtx_prepare(xid=self.xid(id)).status)
def commit(self, id=None, one_phase=True):
self.check_status(
XA_OK, self.session.dtx_commit(xid=self.xid(id), one_phase=one_phase).status)
def rollback(self, id=None):
self.check_status(XA_OK, self.session.dtx_rollback(xid=self.xid(id)).status)
def set_timeout(self, timeout, id=None):
self.session.dtx_set_timeout(xid=self.xid(id),timeout=timeout)
def send(self, messages):
for m in messages:
dp=self.session.delivery_properties(routing_key=self.name)
mp=self.session.message_properties()
self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m))
def accept(self):
"""Accept 1 message from queue"""
consumer_tag="%s-consumer"%(self.name)
self.session.message_subscribe(queue=self.name, destination=consumer_tag)
self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag)
self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
msg = self.session.incoming(consumer_tag).get(timeout=1)
self.session.message_cancel(destination=consumer_tag)
self.session.message_accept(qpid.datatypes.RangedSet(msg.id))
return msg
def verify(self, sessions, messages):
for s in sessions:
self.test.assert_browse(s, self.name, messages)
class DtxTests(BrokerTest):
def test_dtx_update(self):
"""Verify that DTX transaction state is updated to a new broker.
Start a collection of transactions, then add a new cluster member,
then verify they commit/rollback correctly on the new broker."""
# Note: multiple test have been bundled into one to avoid the need to start/stop
# multiple brokers per test.
cluster=self.cluster(1)
sessions = [cluster[0].connect().session()] # For verify
# Transaction that will be open when new member joins, then committed.
t1 = DtxTestFixture(self, cluster[0], "t1")
t1.start()
t1.send(["1", "2"])
t1.verify(sessions, []) # Not visible outside of transaction
# Transaction that will be open when new member joins, then rolled back.
t2 = DtxTestFixture(self, cluster[0], "t2")
t2.start()
t2.send(["1", "2"])
# Transaction that will be prepared when new member joins, then committed.
t3 = DtxTestFixture(self, cluster[0], "t3")
t3.start()
t3.send(["1", "2"])
t3.end()
t3.prepare()
t1.verify(sessions, []) # Not visible outside of transaction
# Transaction that will be prepared when new member joins, then rolled back.
t4 = DtxTestFixture(self, cluster[0], "t4")
t4.start()
t4.send(["1", "2"])
t4.end()
t4.prepare()
# Transaction using an exclusive queue
t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True)
t5.start()
t5.send(["1", "2"])
# Accept messages in a transaction before/after join then commit
t6 = DtxTestFixture(self, cluster[0], "t6")
t6.send(["a","b","c"])
t6.start()
self.assertEqual(t6.accept().body, "a");
# Accept messages in a transaction before/after join then roll back
t7 = DtxTestFixture(self, cluster[0], "t7")
t7.send(["a","b","c"])
t7.start()
self.assertEqual(t7.accept().body, "a");
# Ended, suspended transactions across join.
t8 = DtxTestFixture(self, cluster[0], "t8")
t8.start(id="1")
t8.send(["x"])
t8.end(id="1", suspend=True)
t8.start(id="2")
t8.send(["y"])
t8.end(id="2")
t8.start()
t8.send("z")
# Start new cluster member
cluster.start()
sessions.append(cluster[1].connect().session())
# Commit t1
t1.send(["3","4"])
t1.verify(sessions, [])
t1.end()
t1.commit(one_phase=True)
t1.verify(sessions, ["1","2","3","4"])
# Rollback t2
t2.send(["3","4"])
t2.end()
t2.rollback()
t2.verify(sessions, [])
# Commit t3
t3.commit(one_phase=False)
t3.verify(sessions, ["1","2"])
# Rollback t4
t4.rollback()
t4.verify(sessions, [])
# Commit t5
t5.send(["3","4"])
t5.verify(sessions, [])
t5.end()
t5.commit(one_phase=True)
t5.verify(sessions, ["1","2","3","4"])
# Commit t6
self.assertEqual(t6.accept().body, "b");
t6.verify(sessions, ["c"])
t6.end()
t6.commit(one_phase=True)
t6.session.close() # Make sure they're not requeued by the session.
t6.verify(sessions, ["c"])
# Rollback t7
self.assertEqual(t7.accept().body, "b");
t7.end()
t7.rollback()
t7.verify(sessions, ["a", "b", "c"])
# Resume t8
t8.end()
t8.commit(one_phase=True)
t8.start("1", resume=True)
t8.end("1")
t8.commit("1", one_phase=True)
t8.commit("2", one_phase=True)
t8.verify(sessions, ["z", "x","y"])
def test_dtx_failover_rollback(self):
"""Kill a broker during a transaction, verify we roll back correctly"""
cluster=self.cluster(1, expect=EXPECT_EXIT_FAIL)
cluster.start(expect=EXPECT_RUNNING)
# Test unprepared at crash
t1 = DtxTestFixture(self, cluster[0], "t1")
t1.send(["a"]) # Not in transaction
t1.start()
t1.send(["b"]) # In transaction
# Test prepared at crash
t2 = DtxTestFixture(self, cluster[0], "t2")
t2.send(["a"]) # Not in transaction
t2.start()
t2.send(["b"]) # In transaction
t2.end()
t2.prepare()
# Crash the broker
cluster[0].kill()
# Transactional changes should not appear
s = cluster[1].connect().session();
self.assert_browse(s, "t1", ["a"])
self.assert_browse(s, "t2", ["a"])
def test_dtx_timeout(self):
"""Verify that dtx timeout works"""
cluster = self.cluster(1)
t1 = DtxTestFixture(self, cluster[0], "t1")
t1.start()
t1.set_timeout(1)
time.sleep(1.1)
try:
t1.end()
self.fail("Expected rollback timeout.")
except DtxStatusException, e:
self.assertEqual(e.actual, XA_RBTIMEOUT)
class TxTests(BrokerTest):
def test_tx_update(self):
"""Verify that transaction state is updated to a new broker"""
def make_message(session, body=None, key=None, id=None):
dp=session.delivery_properties(routing_key=key)
mp=session.message_properties(correlation_id=id)
return qpid.datatypes.Message(dp, mp, body)
cluster=self.cluster(1)
# Use old API. TX is not supported in messaging API.
c = cluster[0].connect_old()
s = c.session("tx-session", 1)
s.queue_declare(queue="q")
# Start transaction
s.tx_select()
s.message_transfer(message=make_message(s, "1", "q"))
# Start new member mid-transaction
cluster.start()
# Do more work
s.message_transfer(message=make_message(s, "2", "q"))
# Commit the transaction and verify the results.
s.tx_commit()
for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"])
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
d = self.config.defines.get("DURATION")
if d: return float(d)*60
else: return 3 # Default is to be quick
def test_failover(self):
"""Test fail-over during continuous send-receive with errors"""
# Original cluster will all be killed so expect exit with failure
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
for b in cluster: b.ready() # Wait for brokers to be ready
for b in cluster: ErrorGenerator(b)
# Start sender and receiver threads
cluster[0].declare_queue("test-queue")
sender = NumberedSender(cluster[0], max_depth=1000)
receiver = NumberedReceiver(cluster[0], sender=sender)
receiver.start()
sender.start()
# Wait for sender & receiver to get up and running
retry(lambda: receiver.received > 0)
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration()
i = 0
while time.time() < endtime:
sender.sender.assert_running()
receiver.receiver.assert_running()
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
for b in cluster[i:]: b.ready()
ErrorGenerator(b)
time.sleep(5)
sender.stop()
receiver.stop()
for i in range(i, len(cluster)): cluster[i].kill()
def test_management(self, args=[]):
"""
Stress test: Run management clients and other clients concurrently
while killing and restarting brokers.
"""
class ClientLoop(StoppableThread):
"""Run a client executable in a loop."""
def __init__(self, broker, cmd):
StoppableThread.__init__(self)
self.broker=broker
self.cmd = cmd # Client command.
self.lock = Lock()
self.process = None # Client process.
self.start()
def run(self):
try:
while True:
self.lock.acquire()
try:
if self.stopped: break
self.process = self.broker.test.popen(
self.cmd, expect=EXPECT_UNKNOWN)
finally:
self.lock.release()
try:
exit = self.process.wait()
except OSError, e:
# Process may already have been killed by self.stop()
break
except Exception, e:
self.process.unexpected(
"client of %s: %s"%(self.broker.name, e))
self.lock.acquire()
try:
if self.stopped: break
if exit != 0:
self.process.unexpected(
"client of %s exit code %s"%(self.broker.name, exit))
finally:
self.lock.release()
except Exception, e:
self.error = RethrownException("Error in ClientLoop.run")
def stop(self):
"""Stop the running client and wait for it to exit"""
self.lock.acquire()
try:
if self.stopped: return
self.stopped = True
if self.process:
try: self.process.kill() # Kill the client.
except OSError: pass # The client might not be running.
finally: self.lock.release()
StoppableThread.stop(self)
# body of test_management()
args += ["--mgmt-pub-interval", 1]
args += ["--log-enable=trace+:management"]
# Use store if present.
if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
cluster = self.cluster(3, args, expect=EXPECT_EXIT_FAIL) # brokers will be killed
clients = [] # Per-broker list of clients that only connect to one broker.
mclients = [] # Management clients that connect to every broker in the cluster.
def start_clients(broker):
"""Start ordinary clients for a broker."""
cmds=[
["qpid-tool", "localhost:%s"%(broker.port())],
["qpid-perftest", "--count=5000", "--durable=yes",
"--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
["qpid-txtest", "--queue-base-name", "tx-%s"%str(qpid.datatypes.uuid4()),
"--port", broker.port()],
["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())]
]
clients.append([ClientLoop(broker, cmd) for cmd in cmds])
def start_mclients(broker):
"""Start management clients that make multiple connections."""
cmd = ["qpid-stat", "-b", "localhost:%s" %(broker.port())]
mclients.append(ClientLoop(broker, cmd))
endtime = time.time() + self.duration()
# For long duration, first run is a quarter of the duration.
runtime = min(5.0, self.duration() / 3.0)
alive = 0 # First live cluster member
for i in range(len(cluster)): start_clients(cluster[i])
start_mclients(cluster[alive])
while time.time() < endtime:
time.sleep(runtime)
runtime = 5 # Remaining runs 5 seconds, frequent broker kills
for b in cluster[alive:]: b.ready() # Check if a broker crashed.
# Kill the first broker, expect the clients to fail.
b = cluster[alive]
b.ready()
b.kill()
# Stop the brokers clients and all the mclients.
for c in clients[alive] + mclients:
try: c.stop()
except: pass # Ignore expected errors due to broker shutdown.
clients[alive] = []
mclients = []
# Start another broker and clients
alive += 1
cluster.start(expect=EXPECT_EXIT_FAIL)
cluster[-1].ready() # Wait till its ready
start_clients(cluster[-1])
start_mclients(cluster[alive])
for c in chain(mclients, *clients):
c.stop()
for b in cluster[alive:]:
b.ready() # Verify still alive
b.kill()
# Verify that logs are consistent
cluster_test_logs.verify_logs()
def test_management_qmf2(self):
self.test_management(args=["--mgmt-qmf2=yes"])
def test_connect_consistent(self):
args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
cluster = self.cluster(2, args=args)
end = time.time() + self.duration()
while (time.time() < end): # Get a management interval
for i in xrange(1000): cluster[0].connect().close()
cluster_test_logs.verify_logs()
def test_flowlimit_failover(self):
"""Test fail-over during continuous send-receive with flow control
active.
"""
# Original cluster will all be killed so expect exit with failure
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
for b in cluster: b.ready() # Wait for brokers to be ready
# create a queue with rather draconian flow control settings
ssn0 = cluster[0].connect().session()
s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}")
receiver = NumberedReceiver(cluster[0])
receiver.start()
senders = [NumberedSender(cluster[0]) for i in range(1,3)]
for s in senders:
s.start()
# Wait for senders & receiver to get up and running
retry(lambda: receiver.received > 2*senders)
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration();
i = 0
while time.time() < endtime:
for s in senders: s.sender.assert_running()
receiver.receiver.assert_running()
for b in cluster[i:]: b.ready() # Check if any broker crashed.
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
time.sleep(5)
for s in senders:
s.stop()
receiver.stop()
for i in range(i, len(cluster)): cluster[i].kill()
def test_ttl_failover(self):
"""Test that messages with TTL don't cause problems in a cluster with failover"""
class Client(StoppableThread):
def __init__(self, broker):
StoppableThread.__init__(self)
self.connection = broker.connect(reconnect=True)
self.auto_fetch_reconnect_urls(self.connection)
self.session = self.connection.session()
def auto_fetch_reconnect_urls(self, conn):
"""Replacment for qpid.messaging.util version which is noisy"""
ssn = conn.session("auto-fetch-reconnect-urls")
rcv = ssn.receiver("amq.failover")
rcv.capacity = 10
def main():
while True:
try:
msg = rcv.fetch()
qpid.messaging.util.set_reconnect_urls(conn, msg)
ssn.acknowledge(msg, sync=False)
except messaging.exceptions.LinkClosed: return
except messaging.exceptions.ConnectionError: return
thread = Thread(name="auto-fetch-reconnect-urls", target=main)
thread.setDaemon(True)
thread.start()
def stop(self):
StoppableThread.stop(self)
self.connection.detach()
class Sender(Client):
def __init__(self, broker, address):
Client.__init__(self, broker)
self.sent = 0 # Number of messages _reliably_ sent.
self.sender = self.session.sender(address, capacity=1000)
def send_counted(self, ttl):
self.sender.send(Message(str(self.sent), ttl=ttl))
self.sent += 1
def run(self):
while not self.stopped:
choice = random.randint(0,4)
if choice == 0: self.send_counted(None) # No ttl
elif choice == 1: self.send_counted(100000) # Large ttl
else: # Small ttl, might expire
self.sender.send(Message("", ttl=random.random()/10))
self.sender.send(Message("z"), sync=True) # Chaser.
class Receiver(Client):
def __init__(self, broker, address):
Client.__init__(self, broker)
self.received = 0 # Number of non-empty (reliable) messages received.
self.receiver = self.session.receiver(address, capacity=1000)
def run(self):
try:
while True:
m = self.receiver.fetch(1)
if m.content == "z": break
if m.content: # Ignore unreliable messages
# Ignore duplicates
if int(m.content) == self.received: self.received += 1
except Exception,e: self.error = e
# def test_ttl_failover
# Original cluster will all be killed so expect exit with failure
# Set small purge interval.
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"])
for b in cluster: b.ready() # Wait for brokers to be ready
# Python client failover produces noisy WARN logs, disable temporarily
logger = logging.getLogger()
log_level = logger.getEffectiveLevel()
logger.setLevel(logging.ERROR)
sender = None
receiver = None
try:
# Start sender and receiver threads
receiver = Receiver(cluster[0], "q;{create:always}")
receiver.start()
sender = Sender(cluster[0], "q;{create:always}")
sender.start()
# Wait for sender & receiver to get up and running
retry(lambda: receiver.received > 0)
# Kill brokers in a cycle.
endtime = time.time() + self.duration()
runtime = min(5.0, self.duration() / 4.0)
i = 0
while time.time() < endtime:
for b in cluster[i:]: b.ready() # Check if any broker crashed.
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
b.ready()
time.sleep(runtime)
sender.stop()
receiver.stop()
for b in cluster[i:]:
b.ready() # Check it didn't crash
b.kill()
self.assertEqual(sender.sent, receiver.received)
cluster_test_logs.verify_logs()
finally:
# Detach to avoid slow reconnect attempts during shut-down if test fails.
if sender: sender.connection.detach()
if receiver: receiver.connection.detach()
logger.setLevel(log_level)
def test_msg_group_failover(self):
"""Test fail-over during continuous send-receive of grouped messages.
"""
class GroupedTrafficGenerator(Thread):
def __init__(self, url, queue, group_key):
Thread.__init__(self)
self.url = url
self.queue = queue
self.group_key = group_key
self.status = -1
def run(self):
# generate traffic for approx 10 seconds (2011msgs / 200 per-sec)
cmd = ["msg_group_test",
"--broker=%s" % self.url,
"--address=%s" % self.queue,
"--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS),
"--group-key=%s" % self.group_key,
"--receivers=2",
"--senders=3",
"--messages=2011",
"--send-rate=200",
"--capacity=11",
"--ack-frequency=23",
"--allow-duplicates",
"--group-size=37",
"--randomize-group-size",
"--interleave=13"]
# "--trace"]
self.generator = Popen( cmd );
self.status = self.generator.wait()
return self.status
def results(self):
self.join(timeout=30) # 3x assumed duration
if self.isAlive(): return -1
return self.status
# Original cluster will all be killed so expect exit with failure
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"])
for b in cluster: b.ready() # Wait for brokers to be ready
# create a queue with rather draconian flow control settings
ssn0 = cluster[0].connect().session()
q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}"
s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}" % q_args)
# Kill original brokers, start new ones for the duration.
endtime = time.time() + self.duration();
i = 0
while time.time() < endtime:
traffic = GroupedTrafficGenerator( cluster[i].host_port(),
"test-group-q", "group-id" )
traffic.start()
time.sleep(1)
for x in range(2):
for b in cluster[i:]: b.ready() # Check if any broker crashed.
cluster[i].kill()
i += 1
b = cluster.start(expect=EXPECT_EXIT_FAIL)
time.sleep(1)
# wait for traffic to finish, verify success
self.assertEqual(0, traffic.results())
for i in range(i, len(cluster)): cluster[i].kill()
class StoreTests(BrokerTest):
"""
Cluster tests that can only be run if there is a store available.
"""
def args(self):
assert BrokerTest.store_lib
return ["--load-module", BrokerTest.store_lib]
def test_store_loaded(self):
"""Ensure we are indeed loading a working store"""
broker = self.broker(self.args(), name="recoverme", expect=EXPECT_EXIT_FAIL)
m = Message("x", durable=True)
broker.send_message("q", m)
broker.kill()
broker = self.broker(self.args(), name="recoverme")
self.assertEqual("x", broker.get_message("q").content)
def test_kill_restart(self):
"""Verify we can kill/resetart a broker with store in a cluster"""
cluster = self.cluster(1, self.args())
cluster.start("restartme", expect=EXPECT_EXIT_FAIL).kill()
# Send a message, retrieve from the restarted broker
cluster[0].send_message("q", "x")
m = cluster.start("restartme").get_message("q")
self.assertEqual("x", m.content)
def stop_cluster(self,broker):
"""Clean shut-down of a cluster"""
self.assertEqual(0, qpid_cluster.main(
["-kf", broker.host_port()]))
def test_persistent_restart(self):
"""Verify persistent cluster shutdown/restart scenarios"""
cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True)
a.send_message("q", Message("1", durable=True))
# Kill & restart one member.
c.kill()
self.assertEqual(a.get_message("q").content, "1")
a.send_message("q", Message("2", durable=True))
c = cluster.start("c", expect=EXPECT_EXIT_OK)
self.assertEqual(c.get_message("q").content, "2")
# Shut down the entire cluster cleanly and bring it back up
a.send_message("q", Message("3", durable=True))
self.stop_cluster(a)
a = cluster.start("a", wait=False)
b = cluster.start("b", wait=False)
c = cluster.start("c", wait=True)
self.assertEqual(a.get_message("q").content, "3")
def test_persistent_partial_failure(self):
# Kill 2 members, shut down the last cleanly then restart
# Ensure we use the clean database
cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True)
a.send_message("q", Message("4", durable=True))
a.kill()
b.kill()
self.assertEqual(c.get_message("q").content, "4")
c.send_message("q", Message("clean", durable=True))
self.stop_cluster(c)
a = cluster.start("a", wait=False)
b = cluster.start("b", wait=False)
c = cluster.start("c", wait=True)
self.assertEqual(a.get_message("q").content, "clean")
def test_wrong_cluster_id(self):
# Start a cluster1 broker, then try to restart in cluster2
cluster1 = self.cluster(0, args=self.args())
a = cluster1.start("a", expect=EXPECT_EXIT_OK)
a.terminate()
cluster2 = self.cluster(1, args=self.args())
try:
a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
a.ready()
self.fail("Expected exception")
except: pass
def test_wrong_shutdown_id(self):
# Start 2 members and shut down.
cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
self.stop_cluster(a)
self.assertEqual(a.wait(), 0)
self.assertEqual(b.wait(), 0)
# Restart with a different member and shut down.
a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False)
self.stop_cluster(a)
self.assertEqual(a.wait(), 0)
self.assertEqual(c.wait(), 0)
# Mix members from both shutdown events, they should fail
# TODO aconway 2010-03-11: can't predict the exit status of these
# as it depends on the order of delivery of initial-status messages.
# See comment at top of this file.
a = cluster.start("a", expect=EXPECT_UNKNOWN, wait=False)
b = cluster.start("b", expect=EXPECT_UNKNOWN, wait=False)
self.assertRaises(Exception, lambda: a.ready())
self.assertRaises(Exception, lambda: b.ready())
def test_solo_store_clean(self):
# A single node cluster should always leave a clean store.
cluster = self.cluster(0, self.args())
a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
a.send_message("q", Message("x", durable=True))
a.kill()
a = cluster.start("a")
self.assertEqual(a.get_message("q").content, "x")
def test_last_store_clean(self):
# Verify that only the last node in a cluster to shut down has
# a clean store. Start with cluster of 3, reduce to 1 then
# increase again to ensure that a node that was once alone but
# finally did not finish as the last node does not get a clean
# store.
cluster = self.cluster(0, self.args())
a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
self.assertEqual(a.store_state(), "clean")
b = cluster.start("b", expect=EXPECT_EXIT_FAIL)
c = cluster.start("c", expect=EXPECT_EXIT_FAIL)
self.assertEqual(b.store_state(), "dirty")
self.assertEqual(c.store_state(), "dirty")
retry(lambda: a.store_state() == "dirty")
a.send_message("q", Message("x", durable=True))
a.kill()
b.kill() # c is last man, will mark store clean
retry(lambda: c.store_state() == "clean")
a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man
retry(lambda: c.store_state() == "dirty")
c.kill() # a is now last man
retry(lambda: a.store_state() == "clean")
a.kill()
self.assertEqual(a.store_state(), "clean")
self.assertEqual(b.store_state(), "dirty")
self.assertEqual(c.store_state(), "dirty")
def test_restart_clean(self):
"""Verify that we can re-start brokers one by one in a
persistent cluster after a clean oshutdown"""
cluster = self.cluster(0, self.args())
a = cluster.start("a", expect=EXPECT_EXIT_OK)
b = cluster.start("b", expect=EXPECT_EXIT_OK)
c = cluster.start("c", expect=EXPECT_EXIT_OK)
a.send_message("q", Message("x", durable=True))
self.stop_cluster(a)
a = cluster.start("a")
b = cluster.start("b")
c = cluster.start("c")
self.assertEqual(c.get_message("q").content, "x")
def test_join_sub_size(self):
"""Verify that after starting a cluster with cluster-size=N,
we can join new members even if size < N-1"""
cluster = self.cluster(0, self.args()+["--cluster-size=3"])
a = cluster.start("a", wait=False, expect=EXPECT_EXIT_FAIL)
b = cluster.start("b", wait=False, expect=EXPECT_EXIT_FAIL)
c = cluster.start("c")
a.send_message("q", Message("x", durable=True))
a.send_message("q", Message("y", durable=True))
a.kill()
b.kill()
a = cluster.start("a")
self.assertEqual(c.get_message("q").content, "x")
b = cluster.start("b")
self.assertEqual(c.get_message("q").content, "y")