| #!/usr/bin/env python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| def publisher(n): |
| import qpid |
| import sys |
| from qpid.client import Client |
| from qpid.content import Content |
| if len(sys.argv) >= 3: |
| n = int(sys.argv[2]) |
| client = Client("127.0.0.1", 5672) |
| client.start({"LOGIN": "guest", "PASSWORD": "guest"}) |
| channel = client.channel(1) |
| channel.session_open() |
| message = Content("message") |
| message["routing_key"] = "message_queue" |
| print "producing ", n, " messages" |
| for i in range(n): |
| channel.message_transfer(destination="amq.direct", content=message) |
| |
| print "producing final message" |
| message = Content("That's done") |
| message["routing_key"] = "message_queue" |
| channel.message_transfer(destination="amq.direct", content=message) |
| |
| print "consuming sync message" |
| consumer = "consumer" |
| queue = client.queue(consumer) |
| channel.message_subscribe(queue="sync_queue", destination=consumer) |
| channel.message_flow(consumer, 0, 0xFFFFFFFF) |
| channel.message_flow(consumer, 1, 0xFFFFFFFF) |
| queue.get(block = True) |
| print "done" |
| channel.session_close() |
| |
| def consumer(): |
| import sys |
| import qpid |
| from qpid.client import Client |
| from qpid.content import Content |
| client = Client("127.0.0.1", 5672) |
| client.start({"LOGIN": "guest", "PASSWORD": "guest"}) |
| channel = client.channel(1) |
| channel.session_open() |
| consumer = "consumer" |
| queue = client.queue(consumer) |
| channel.message_subscribe(queue="message_queue", destination=consumer) |
| channel.message_flow(consumer, 0, 0xFFFFFFFF) |
| channel.message_flow(consumer, 1, 0xFFFFFFFF) |
| final = "That's done" |
| content = "" |
| message = None |
| print "getting messages" |
| while content != final: |
| message = queue.get(block = True) |
| content = message.content.body |
| message.complete(cumulative=True) |
| |
| print "consumed all messages" |
| message = Content("message") |
| message["routing_key"] = "sync_queue" |
| channel.message_transfer(destination="amq.direct", content=message) |
| print "done" |
| channel.session_close() |
| |
| if __name__=='__main__': |
| import sys |
| import qpid |
| from timeit import Timer |
| from qpid.client import Client |
| from qpid.content import Content |
| client = Client("127.0.0.1", 5672) |
| client.start({"LOGIN": "guest", "PASSWORD": "guest"}) |
| channel = client.channel(1) |
| channel.session_open() |
| channel.queue_declare(queue="message_queue") |
| channel.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="message_queue") |
| channel.queue_declare(queue="sync_queue") |
| channel.queue_bind(exchange="amq.direct", queue="sync_queue", routing_key="sync_queue") |
| channel.session_close() |
| |
| numMess = 100 |
| if len(sys.argv) >= 3: |
| numMess = int(sys.argv[2]) |
| if len(sys.argv) == 1: |
| print "error: please specify prod or cons" |
| elif sys.argv[1] == 'prod': |
| tprod = Timer("publisher(100)", "from __main__ import publisher") |
| tp = tprod.timeit(1) |
| print "produced and consumed" , numMess + 2 ,"messages in: ", tp |
| elif sys.argv[1] == 'cons': |
| tcons = Timer("consumer()", "from __main__ import consumer") |
| tc = tcons.timeit(1) |
| print "consumed " , numMess ," in: ", tc |
| else: |
| print "please specify prod or cons" |