| #!/usr/bin/env python |
| |
| def publisher(n): |
| import qpid |
| import sys |
| from qpid.client import Client |
| from qpid.content import Content |
| if len(sys.argv) >= 3: |
| n = int(sys.argv[2]) |
| client = Client("127.0.0.1", 5672) |
| client.start({"LOGIN": "guest", "PASSWORD": "guest"}) |
| channel = client.channel(1) |
| channel.session_open() |
| message = Content("message") |
| message["routing_key"] = "message_queue" |
| print "producing ", n, " messages" |
| for i in range(n): |
| channel.message_transfer(destination="amq.direct", content=message) |
| |
| print "producing final message" |
| message = Content("That's done") |
| message["routing_key"] = "message_queue" |
| channel.message_transfer(destination="amq.direct", content=message) |
| |
| print "consuming sync message" |
| consumer = "consumer" |
| queue = client.queue(consumer) |
| channel.message_subscribe(queue="sync_queue", destination=consumer) |
| channel.message_flow(consumer, 0, 0xFFFFFFFF) |
| channel.message_flow(consumer, 1, 0xFFFFFFFF) |
| queue.get(block = True) |
| print "done" |
| channel.session_close() |
| |
| def consumer(): |
| import sys |
| import qpid |
| from qpid.client import Client |
| from qpid.content import Content |
| client = Client("127.0.0.1", 5672) |
| client.start({"LOGIN": "guest", "PASSWORD": "guest"}) |
| channel = client.channel(1) |
| channel.session_open() |
| consumer = "consumer" |
| queue = client.queue(consumer) |
| channel.message_subscribe(queue="message_queue", destination=consumer) |
| channel.message_flow(consumer, 0, 0xFFFFFFFF) |
| channel.message_flow(consumer, 1, 0xFFFFFFFF) |
| final = "That's done" |
| content = "" |
| message = None |
| print "getting messages" |
| while content != final: |
| message = queue.get(block = True) |
| content = message.content.body |
| message.complete(cumulative=True) |
| |
| print "consumed all messages" |
| message = Content("message") |
| message["routing_key"] = "sync_queue" |
| channel.message_transfer(destination="amq.direct", content=message) |
| print "done" |
| channel.session_close() |
| |
| if __name__=='__main__': |
| import sys |
| import qpid |
| from timeit import Timer |
| from qpid.client import Client |
| from qpid.content import Content |
| client = Client("127.0.0.1", 5672) |
| client.start({"LOGIN": "guest", "PASSWORD": "guest"}) |
| channel = client.channel(1) |
| channel.session_open() |
| channel.queue_declare(queue="message_queue") |
| channel.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="message_queue") |
| channel.queue_declare(queue="sync_queue") |
| channel.queue_bind(exchange="amq.direct", queue="sync_queue", routing_key="sync_queue") |
| channel.session_close() |
| |
| numMess = 100 |
| if len(sys.argv) >= 3: |
| numMess = int(sys.argv[2]) |
| if len(sys.argv) == 1: |
| print "error: please specify prod or cons" |
| elif sys.argv[1] == 'prod': |
| tprod = Timer("publisher(100)", "from __main__ import publisher") |
| tp = tprod.timeit(1) |
| print "produced and consumed" , numMess + 2 ,"messages in: ", tp |
| elif sys.argv[1] == 'cons': |
| tcons = Timer("consumer()", "from __main__ import consumer") |
| tc = tcons.timeit(1) |
| print "consumed " , numMess ," in: ", tc |
| else: |
| print "please specify prod or cons" |