blob: 436c429afcf8dd8d6aae8737a3e39cc3a16e8582 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import absolute_import
from qpid.datatypes import Message, RangedSet
from qpid.testlib import TestBase010
from qpid.management import managementChannel, managementClient
from threading import Condition
from time import sleep
import qmf.console
import qpid.messaging
from qpid.messaging.exceptions import Empty
from qpidtoollibs import BrokerAgent
class ManagementTest (TestBase010):
def setup_access(self):
if 'broker_agent' not in self.__dict__:
self.conn2 = qpid.messaging.Connection(self.broker)
self.conn2.open()
self.broker_agent = BrokerAgent(self.conn2)
return self.broker_agent
"""
Tests for the management hooks
"""
def test_broker_connectivity_oldAPI (self):
"""
Call the "echo" method on the broker to verify it is alive and talking.
"""
session = self.session
mc = managementClient ()
mch = mc.addChannel (session)
mc.syncWaitForStable (mch)
brokers = mc.syncGetObjects (mch, "broker")
self.assertEqual (len (brokers), 1)
broker = brokers[0]
args = {}
body = "Echo Message Body"
args["body"] = body
for seq in range (1, 5):
args["sequence"] = seq
res = mc.syncCallMethod (mch, broker.id, broker.classKey, "echo", args)
self.assertEqual (res.status, 0)
self.assertEqual (res.statusText, "OK")
self.assertEqual (res.sequence, seq)
self.assertEqual (res.body, body)
mc.removeChannel (mch)
def test_methods_sync (self):
"""
Call the "echo" method on the broker to verify it is alive and talking.
"""
session = self.session
self.startQmf()
brokers = self.qmf.getObjects(_class="broker")
self.assertEqual(len(brokers), 1)
broker = brokers[0]
body = "Echo Message Body"
for seq in range(1, 20):
res = broker.echo(seq, body)
self.assertEqual(res.status, 0)
self.assertEqual(res.text, "OK")
self.assertEqual(res.sequence, seq)
self.assertEqual(res.body, body)
def test_get_objects(self):
self.startQmf()
# get the package list, verify that the qpid broker package is there
packages = self.qmf.getPackages()
assert 'org.apache.qpid.broker' in packages
# get the schema class keys for the broker, verify the broker table and link-down event
keys = self.qmf.getClasses('org.apache.qpid.broker')
broker = None
linkDown = None
for key in keys:
if key.getClassName() == "broker": broker = key
if key.getClassName() == "brokerLinkDown" : linkDown = key
assert broker
assert linkDown
brokerObjs = self.qmf.getObjects(_class="broker")
assert len(brokerObjs) == 1
brokerObjs = self.qmf.getObjects(_key=broker)
assert len(brokerObjs) == 1
def test_self_session_id (self):
self.startQmf()
sessionId = self.qmf_broker.getSessionId()
brokerSessions = self.qmf.getObjects(_class="session")
found = False
for bs in brokerSessions:
if bs.name.endswith(sessionId):
found = True
self.assertEqual (found, True)
def test_standard_exchanges (self):
self.startQmf()
exchanges = self.qmf.getObjects(_class="exchange")
exchange = self.findExchange (exchanges, "")
self.assertEqual (exchange.type, "direct")
exchange = self.findExchange (exchanges, "amq.direct")
self.assertEqual (exchange.type, "direct")
exchange = self.findExchange (exchanges, "amq.topic")
self.assertEqual (exchange.type, "topic")
exchange = self.findExchange (exchanges, "amq.fanout")
self.assertEqual (exchange.type, "fanout")
exchange = self.findExchange (exchanges, "amq.match")
self.assertEqual (exchange.type, "headers")
exchange = self.findExchange (exchanges, "qpid.management")
self.assertEqual (exchange.type, "topic")
def findExchange (self, exchanges, name):
for exchange in exchanges:
if exchange.name == name:
return exchange
return None
def test_move_queued_messages_empty(self):
"""
Test that moving messages from an empty queue does not cause an error.
"""
self.startQmf()
session = self.session
"Set up source queue"
session.queue_declare(queue="src-queue-empty", exclusive=True, auto_delete=True)
"Set up destination queue"
session.queue_declare(queue="dest-queue-empty", exclusive=True, auto_delete=True)
queues = self.qmf.getObjects(_class="queue")
"Move all messages from src-queue-empty to dest-queue-empty"
result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue-empty", "dest-queue-empty", 0, {})
self.assertEqual (result.status, 0)
sq = self.qmf.getObjects(_class="queue", name="src-queue-empty")[0]
dq = self.qmf.getObjects(_class="queue", name="dest-queue-empty")[0]
self.assertEqual (sq.msgDepth,0)
self.assertEqual (dq.msgDepth,0)
def test_move_queued_messages(self):
"""
Test ability to move messages from the head of one queue to another.
Need to test moveing all and N messages.
"""
self.startQmf()
session = self.session
"Set up source queue"
session.queue_declare(queue="src-queue", exclusive=True, auto_delete=True)
session.exchange_bind(queue="src-queue", exchange="amq.direct", binding_key="routing_key")
twenty = range(1,21)
props = session.delivery_properties(routing_key="routing_key")
for count in twenty:
body = "Move Message %d" % count
src_msg = Message(props, body)
session.message_transfer(destination="amq.direct", message=src_msg)
"Set up destination queue"
session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True)
session.exchange_bind(queue="dest-queue", exchange="amq.direct")
queues = self.qmf.getObjects(_class="queue")
"Move 10 messages from src-queue to dest-queue"
result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10, {})
self.assertEqual (result.status, 0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,10)
self.assertEqual (dq.msgDepth,10)
"Move all remaining messages to destination"
result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0, {})
self.assertEqual (result.status,0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,0)
self.assertEqual (dq.msgDepth,20)
"Use a bad source queue name"
result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0, {})
self.assertEqual (result.status,4)
"Use a bad destination queue name"
result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0, {})
self.assertEqual (result.status,4)
" Use a large qty (40) to move from dest-queue back to "
" src-queue- should move all "
result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40, {})
self.assertEqual (result.status,0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,20)
self.assertEqual (dq.msgDepth,0)
"Consume the messages of the queue and check they are all there in order"
session.message_subscribe(queue="src-queue", destination="tag")
session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFF)
session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFF)
queue = session.incoming("tag")
for count in twenty:
consumed_msg = queue.get(timeout=1)
body = "Move Message %d" % count
self.assertEqual(body, consumed_msg.body)
def test_purge_queue(self):
"""
Test ability to purge messages from the head of a queue.
Need to test moveing all, 1 (top message) and N messages.
"""
self.startQmf()
session = self.session
"Set up purge queue"
session.queue_declare(queue="purge-queue", exclusive=True, auto_delete=True)
session.exchange_bind(queue="purge-queue", exchange="amq.direct", binding_key="routing_key")
twenty = range(1,21)
props = session.delivery_properties(routing_key="routing_key")
for count in twenty:
body = "Purge Message %d" % count
msg = Message(props, body)
session.message_transfer(destination="amq.direct", message=msg)
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
"Purge top message from purge-queue"
result = pq.purge(1, {})
self.assertEqual (result.status, 0)
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,19)
"Purge top 9 messages from purge-queue"
result = pq.purge(9, {})
self.assertEqual (result.status, 0)
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,10)
"Purge all messages from purge-queue"
result = pq.purge(0, {})
self.assertEqual (result.status, 0)
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,0)
def test_reroute_priority_queue(self):
self.startQmf()
session = self.session
#setup test queue supporting multiple priority levels
session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10})
#send some messages of varying priority to that queue:
for i in range(0, 5):
deliveryProps = session.delivery_properties(routing_key="test-queue", priority=i+5)
session.message_transfer(message=Message(deliveryProps, "Message %d" % (i+1)))
#declare and bind a queue to amq.fanout through which rerouted
#messages can be verified:
session.queue_declare(queue="rerouted", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10})
session.exchange_bind(queue="rerouted", exchange="amq.fanout")
#reroute messages from test queue to amq.fanout (and hence to
#rerouted queue):
pq = self.qmf.getObjects(_class="queue", name="test-queue")[0]
result = pq.reroute(0, False, "amq.fanout", {})
self.assertEqual(result.status, 0)
#verify messages are all rerouted:
self.subscribe(destination="incoming", queue="rerouted")
incoming = session.incoming("incoming")
for i in range(0, 5):
msg = incoming.get(timeout=1)
self.assertEqual("Message %d" % (5-i), msg.body)
def test_reroute_queue(self):
"""
Test ability to reroute messages from the head of a queue.
Need to test moving all, 1 (top message) and N messages.
"""
self.startQmf()
session = self.session
"Set up test queue"
session.exchange_declare(exchange="alt.direct1", type="direct")
session.queue_declare(queue="alt-queue1", exclusive=True, auto_delete=True)
session.exchange_bind(queue="alt-queue1", exchange="alt.direct1", binding_key="routing_key")
session.exchange_declare(exchange="alt.direct2", type="direct")
session.queue_declare(queue="alt-queue2", exclusive=True, auto_delete=True)
session.exchange_bind(queue="alt-queue2", exchange="alt.direct2", binding_key="routing_key")
session.queue_declare(queue="reroute-queue", exclusive=True, auto_delete=True, alternate_exchange="alt.direct1")
session.exchange_bind(queue="reroute-queue", exchange="amq.direct", binding_key="routing_key")
twenty = range(1,21)
props = session.delivery_properties(routing_key="routing_key")
mp = session.message_properties(application_headers={'x-qpid.trace' : 'A,B,C'})
for count in twenty:
body = "Reroute Message %d" % count
msg = Message(props, mp, body)
session.message_transfer(destination="amq.direct", message=msg)
pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
"Reroute top message from reroute-queue to alternate exchange"
result = pq.reroute(1, True, "", {})
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
self.assertEqual(pq.msgDepth,19)
self.assertEqual(aq.msgDepth,1)
"Verify that the trace was cleared on the rerouted message"
url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host, self.broker.port or 5672)
conn = qpid.messaging.Connection(url)
conn.open()
sess = conn.session()
rx = sess.receiver("alt-queue1;{mode:browse}")
rm = rx.fetch(1)
self.assertEqual(rm.properties['x-qpid.trace'], '')
conn.close()
"Reroute top 9 messages from reroute-queue to alt.direct2"
result = pq.reroute(9, False, "alt.direct2", {})
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
self.assertEqual(pq.msgDepth,10)
self.assertEqual(aq.msgDepth,9)
"Reroute using a non-existent exchange"
result = pq.reroute(0, False, "amq.nosuchexchange", {})
self.assertEqual(result.status, 4)
"Reroute all messages from reroute-queue"
result = pq.reroute(0, False, "alt.direct2", {})
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
self.assertEqual(pq.msgDepth,0)
self.assertEqual(aq.msgDepth,19)
"Make more messages"
twenty = range(1,21)
props = session.delivery_properties(routing_key="routing_key")
for count in twenty:
body = "Reroute Message %d" % count
msg = Message(props, body)
session.message_transfer(destination="amq.direct", message=msg)
"Reroute onto the same queue"
result = pq.reroute(0, False, "amq.direct", {})
self.assertEqual(result.status, 0)
pq.update()
self.assertEqual(pq.msgDepth,20)
def test_reroute_alternate_exchange(self):
"""
Test that when rerouting, the alternate-exchange is considered if relevant
"""
self.startQmf()
session = self.session
# 1. Create 2 exchanges A and B (fanout) where B is the
# alternate exchange for A
session.exchange_declare(exchange="B", type="fanout")
session.exchange_declare(exchange="A", type="fanout", alternate_exchange="B")
# 2. Bind queue X to B
session.queue_declare(queue="X", exclusive=True, auto_delete=True)
session.exchange_bind(queue="X", exchange="B")
# 3. Send 1 message to queue Y
session.queue_declare(queue="Y", exclusive=True, auto_delete=True)
props = session.delivery_properties(routing_key="Y")
session.message_transfer(message=Message(props, "reroute me!"))
# 4. Call reroute on queue Y and specify that messages should
# be sent to exchange A
y = self.qmf.getObjects(_class="queue", name="Y")[0]
result = y.reroute(1, False, "A", {})
self.assertEqual(result.status, 0)
# 5. verify that the message is rerouted through B (as A has
# no matching bindings) to X
self.subscribe(destination="x", queue="X")
self.assertEqual("reroute me!", session.incoming("x").get(timeout=1).body)
# Cleanup
for e in ["A", "B"]: session.exchange_delete(exchange=e)
def test_reroute_invalid_alt_exchange(self):
"""
Test that an error is returned for an attempt to reroute to
alternate exchange on a queue for which no such exchange has
been defined.
"""
self.startQmf()
session = self.session
# create queue with no alt-exchange, and send a message to it
session.queue_declare(queue="q", exclusive=True, auto_delete=True)
props = session.delivery_properties(routing_key="q")
session.message_transfer(message=Message(props, "don't reroute me!"))
# attempt to reroute the message to alt-exchange
q = self.qmf.getObjects(_class="queue", name="q")[0]
result = q.reroute(1, True, "", {})
# verify the attempt fails...
self.assertEqual(result.status, 4) #invalid parameter
# ...and message is still on the queue
self.subscribe(destination="d", queue="q")
self.assertEqual("don't reroute me!", session.incoming("d").get(timeout=1).body)
def test_methods_async (self):
"""
"""
class Handler (qmf.console.Console):
def __init__(self):
self.cv = Condition()
self.xmtList = {}
self.rcvList = {}
def methodResponse(self, broker, seq, response):
self.cv.acquire()
try:
self.rcvList[seq] = response
finally:
self.cv.release()
def request(self, broker, count):
self.count = count
for idx in range(count):
self.cv.acquire()
try:
seq = broker.echo(idx, "Echo Message", _async = True)
self.xmtList[seq] = idx
finally:
self.cv.release()
def check(self):
if self.count != len(self.xmtList):
return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList))
lost = 0
mismatched = 0
for seq in self.xmtList:
value = self.xmtList[seq]
if seq in self.rcvList:
result = self.rcvList.pop(seq)
if result.sequence != value:
mismatched += 1
else:
lost += 1
spurious = len(self.rcvList)
if lost == 0 and mismatched == 0 and spurious == 0:
return "pass"
else:
return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious)
handler = Handler()
self.startQmf(handler)
brokers = self.qmf.getObjects(_class="broker")
self.assertEqual(len(brokers), 1)
broker = brokers[0]
handler.request(broker, 20)
sleep(1)
self.assertEqual(handler.check(), "pass")
def test_connection_close(self):
"""
Test management method for closing connection
"""
self.startQmf()
conn = self.connect()
session = conn.session("my-named-session")
#using qmf find named session and close the corresponding connection:
qmf_ssn_object = [s for s in self.qmf.getObjects(_class="session") if s.name.endswith("my-named-session")][0]
qmf_ssn_object._connectionRef_.close()
#check that connection is closed
try:
conn.session("another-session")
self.fail("Expected failure from closed connection")
except: None
#make sure that the named session has been closed and the name can be re-used
conn = self.connect()
session = conn.session("my-named-session")
session.queue_declare(queue="whatever", exclusive=True, auto_delete=True)
def test_immediate_method(self):
url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host or "localhost", self.broker.port or 5672)
conn = qpid.messaging.Connection(url)
conn.open()
sess = conn.session()
replyTo = "qmf.default.direct/reply_immediate_method_test;{node:{type:topic}}"
agent_sender = sess.sender("qmf.default.direct/broker")
agent_receiver = sess.receiver(replyTo)
queue_create = sess.sender("test-queue-imm-method;{create:always,delete:always,node:{type:queue,durable:False,x-declare:{auto-delete:True}}}")
method_request = {'_method_name':'reroute','_object_id':{'_object_name':'org.apache.qpid.broker:queue:test-queue-imm-method'}}
method_request['_arguments'] = {'request':0, 'useAltExchange':False, 'exchange':'amq.fanout'}
reroute_call = qpid.messaging.Message(method_request)
reroute_call.properties['qmf.opcode'] = '_method_request'
reroute_call.properties['x-amqp-0-10.app-id'] = 'qmf2'
reroute_call.reply_to = replyTo
agent_sender.send(reroute_call)
result = agent_receiver.fetch(3)
self.assertEqual(result.properties['qmf.opcode'], '_method_response')
conn.close()
def test_binding_count_on_queue(self):
self.startQmf()
conn = self.connect()
session = self.session
QUEUE = "binding_test_queue"
EX_DIR = "binding_test_exchange_direct"
EX_FAN = "binding_test_exchange_fanout"
EX_TOPIC = "binding_test_exchange_topic"
EX_HDR = "binding_test_exchange_headers"
#
# Create a test queue
#
session.queue_declare(queue=QUEUE, exclusive=True, auto_delete=True)
queue = self.qmf.getObjects(_class="queue", name=QUEUE)[0]
if not queue:
self.fail("Queue not found")
self.assertEqual(queue.bindingCount, 1, "wrong initial binding count")
#
# Create an exchange of each supported type
#
session.exchange_declare(exchange=EX_DIR, type="direct")
session.exchange_declare(exchange=EX_FAN, type="fanout")
session.exchange_declare(exchange=EX_TOPIC, type="topic")
session.exchange_declare(exchange=EX_HDR, type="headers")
#
# Bind each exchange to the test queue
#
match = {}
match['x-match'] = "all"
match['key'] = "value"
session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key1")
session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
session.exchange_bind(exchange=EX_FAN, queue=QUEUE)
session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key1.#")
session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key1", arguments=match)
match['key2'] = "value2"
session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key2", arguments=match)
#
# Verify that the queue's binding count accounts for the new bindings
#
queue.update()
self.assertEqual(queue.bindingCount, 8,
"added bindings not accounted for (expected 8, got %d)" % queue.bindingCount)
#
# Remove some of the bindings
#
session.exchange_unbind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
session.exchange_unbind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
session.exchange_unbind(exchange=EX_HDR, queue=QUEUE, binding_key="key2")
#
# Verify that the queue's binding count accounts for the deleted bindings
#
queue.update()
self.assertEqual(queue.bindingCount, 5,
"deleted bindings not accounted for (expected 5, got %d)" % queue.bindingCount)
#
# Delete the exchanges
#
session.exchange_delete(exchange=EX_DIR)
session.exchange_delete(exchange=EX_FAN)
session.exchange_delete(exchange=EX_TOPIC)
session.exchange_delete(exchange=EX_HDR)
#
# Verify that the queue's binding count accounts for the lost bindings
#
queue.update()
self.assertEqual(queue.bindingCount, 1,
"deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
def test_connection_stats(self):
"""
Test message in/out stats for connection
"""
agent = self.setup_access()
conn = self.connect()
session = conn.session("stats-session")
#using qmf find named session and the corresponding connection:
conn_qmf = None
sessions = agent.getAllSessions()
for s in sessions:
if s.name.endswith("stats-session"):
conn_qmf = agent.getConnection(s.connectionRef)
assert(conn_qmf)
#send a message to a queue
session.queue_declare(queue="stats-q", exclusive=True, auto_delete=True)
session.message_transfer(message=Message(session.delivery_properties(routing_key="stats-q"), "abc"))
#check the 'msgs sent from' stat for this connection
conn_qmf.update()
self.assertEqual(conn_qmf.msgsFromClient, 1)
#receive message from queue
session.message_subscribe(destination="d", queue="stats-q")
incoming = session.incoming("d")
incoming.start()
self.assertEqual("abc", incoming.get(timeout=1).body)
#check the 'msgs sent to' stat for this connection
conn_qmf.update()
self.assertEqual(conn_qmf.msgsToClient, 1)
def test_timestamp_config(self):
"""
Test message timestamping control.
"""
self.startQmf()
conn = self.connect()
session = conn.session("timestamp-session")
#verify that receive message timestamping is OFF by default
broker = self.qmf.getObjects(_class="broker")[0]
rc = broker.getTimestampConfig()
self.assertEqual(rc.status, 0)
self.assertEqual(rc.text, "OK")
#try to enable it
rc = broker.setTimestampConfig(True)
self.assertEqual(rc.status, 0)
self.assertEqual(rc.text, "OK")
rc = broker.getTimestampConfig()
self.assertEqual(rc.status, 0)
self.assertEqual(rc.text, "OK")
self.assertEqual(rc.receive, True)
# setup a connection & session to the broker
url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host or "localhost", self.broker.port or 5672)
conn = qpid.messaging.Connection(url)
conn.open()
sess = conn.session()
#send a message to a queue
sender = sess.sender("ts-q; {create:sender, delete:receiver}")
sender.send( qpid.messaging.Message(content="abc") )
#receive message from queue, and verify timestamp is present
receiver = sess.receiver("ts-q")
try:
msg = receiver.fetch(timeout=1)
except Empty:
assert(False)
self.assertEqual("abc", msg.content)
self.assertEqual(True, "x-amqp-0-10.timestamp" in msg.properties)
assert(msg.properties["x-amqp-0-10.timestamp"])
#try to disable it
rc = broker.setTimestampConfig(False)
self.assertEqual(rc.status, 0)
self.assertEqual(rc.text, "OK")
rc = broker.getTimestampConfig()
self.assertEqual(rc.status, 0)
self.assertEqual(rc.text, "OK")
self.assertEqual(rc.receive, False)
#send another message to the queue
sender.send( qpid.messaging.Message(content="def") )
#receive message from queue, and verify timestamp is NOT PRESENT
receiver = sess.receiver("ts-q")
try:
msg = receiver.fetch(timeout=1)
except Empty:
assert(False)
self.assertEqual("def", msg.content)
self.assertEqual(False, "x-amqp-0-10.timestamp" in msg.properties)