blob: f4d3c95e964766a2a1dd2239804816cb81d6bdae [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
def publisher(n):
import qpid
import sys
from qpid.client import Client
from qpid.content import Content
if len(sys.argv) >= 3:
n = int(sys.argv[2])
client = Client("127.0.0.1", 5672)
client.start({"LOGIN": "guest", "PASSWORD": "guest"})
channel = client.channel(1)
channel.session_open()
message = Content("message")
message["routing_key"] = "message_queue"
print "producing ", n, " messages"
for i in range(n):
channel.message_transfer(destination="amq.direct", content=message)
print "producing final message"
message = Content("That's done")
message["routing_key"] = "message_queue"
channel.message_transfer(destination="amq.direct", content=message)
print "consuming sync message"
consumer = "consumer"
queue = client.queue(consumer)
channel.message_subscribe(queue="sync_queue", destination=consumer)
channel.message_flow(consumer, 0, 0xFFFFFFFF)
channel.message_flow(consumer, 1, 0xFFFFFFFF)
queue.get(block = True)
print "done"
channel.session_close()
def consumer():
import sys
import qpid
from qpid.client import Client
from qpid.content import Content
client = Client("127.0.0.1", 5672)
client.start({"LOGIN": "guest", "PASSWORD": "guest"})
channel = client.channel(1)
channel.session_open()
consumer = "consumer"
queue = client.queue(consumer)
channel.message_subscribe(queue="message_queue", destination=consumer)
channel.message_flow(consumer, 0, 0xFFFFFFFF)
channel.message_flow(consumer, 1, 0xFFFFFFFF)
final = "That's done"
content = ""
message = None
print "getting messages"
while content != final:
message = queue.get(block = True)
content = message.content.body
message.complete(cumulative=True)
print "consumed all messages"
message = Content("message")
message["routing_key"] = "sync_queue"
channel.message_transfer(destination="amq.direct", content=message)
print "done"
channel.session_close()
if __name__=='__main__':
import sys
import qpid
from timeit import Timer
from qpid.client import Client
from qpid.content import Content
client = Client("127.0.0.1", 5672)
client.start({"LOGIN": "guest", "PASSWORD": "guest"})
channel = client.channel(1)
channel.session_open()
channel.queue_declare(queue="message_queue")
channel.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="message_queue")
channel.queue_declare(queue="sync_queue")
channel.queue_bind(exchange="amq.direct", queue="sync_queue", routing_key="sync_queue")
channel.session_close()
numMess = 100
if len(sys.argv) >= 3:
numMess = int(sys.argv[2])
if len(sys.argv) == 1:
print "error: please specify prod or cons"
elif sys.argv[1] == 'prod':
tprod = Timer("publisher(100)", "from __main__ import publisher")
tp = tprod.timeit(1)
print "produced and consumed" , numMess + 2 ,"messages in: ", tp
elif sys.argv[1] == 'cons':
tcons = Timer("consumer()", "from __main__ import consumer")
tc = tcons.timeit(1)
print "consumed " , numMess ," in: ", tc
else:
print "please specify prod or cons"