blob: 300d34774fcc4cd202ded38487b5c96355d02f32 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import optparse, time, qpid.messaging, re
from threading import Thread
from subprocess import Popen, PIPE, STDOUT
op = optparse.OptionParser(usage="usage: %prog [options]",
description="simple performance benchmarks")
op.add_option("-b", "--broker", default=[], action="append", type="str",
help="url of broker(s) to connect to, round robin on multiple brokers")
op.add_option("-c", "--client-host", default=[], action="append", type="str",
help="host(s) to run clients on via ssh, round robin on mulple hosts")
op.add_option("-q", "--queues", default=1, type="int", metavar="N",
help="create N queues (default %default)")
op.add_option("-s", "--senders", default=1, type="int", metavar="N",
help="start N senders per queue (default %default)")
op.add_option("-r", "--receivers", default=1, type="int", metavar="N",
help="start N receivers per queue (default %default)")
op.add_option("-m", "--messages", default=100000, type="int", metavar="N",
help="send N messages per sender (default %default)")
op.add_option("--queue-name", default="benchmark", metavar="NAME",
help="base name for queues (default %default)")
op.add_option("--send-rate", default=0, metavar="N",
help="send rate limited to N messages/second, 0 means no limit (default %default)")
op.add_option("--receive-rate", default=0, metavar="N",
help="receive rate limited to N messages/second, 0 means no limit (default %default)")
op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
help="message size in bytes (default %default)")
op.add_option("--ack-frequency", default=100, metavar="N", type="int",
help="receiver ack's every N messages, 0 means unconfirmed (default %default)")
op.add_option("--no-report-header", dest="report_header", default=True,
action="store_false", help="don't print header on report")
op.add_option("--summarize", default=False, action="store_true",
help="print summary statistics for multiple senders/receivers: total throughput, average latency")
op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int")
op.add_option("--send-option", default=[], action="append", type="str",
help="Additional option for sending addresses")
op.add_option("--receive-option", default=[], action="append", type="str",
help="Additional option for receiving addresses")
op.add_option("--send-arg", default=[], action="append", type="str",
help="Additional argument for qpid-send")
op.add_option("--receive-arg", default=[], action="append", type="str",
help="Additional argument for qpid-receive")
op.add_option("--no-timestamp", dest="timestamp", default=True,
action="store_false", help="don't add a timestamp, no latency results")
op.add_option("--connection-options", type="str",
help="Connection options for senders & receivers")
op.add_option("--flow-control", default=0, type="int", metavar="N",
help="Flow control each sender to limit queue depth to 2*N. 0 means no flow control.")
op.add_option("--durable", default=False, action="store_true",
help="Use durable queues and messages")
single_quote_re = re.compile("'")
def posix_quote(string):
""" Quote a string for use as an argument in a posix shell"""
return "'" + single_quote_re.sub("\\'", string) + "'";
def ssh_command(host, command):
"""Convert command into an ssh command on host with quoting"""
return ["ssh", host] + [posix_quote(arg) for arg in command]
class Clients:
def __init__(self): self.clients=[]
def add(self, client):
self.clients.append(client)
return client
def kill(self):
for c in self.clients:
try: c.kill()
except: pass
clients = Clients()
def start_receive(queue, index, opts, ready_queue, broker, host):
address_opts=["create:receiver"] + opts.receive_option
if opts.durable: address_opts += ["node:{durable:true}"]
address="%s;{%s}"%(queue,",".join(address_opts))
msg_total=opts.senders*opts.messages
messages = msg_total/opts.receivers;
if (index < msg_total%opts.receivers): messages += 1
if (messages == 0): return None
command = ["qpid-receive",
"-b", broker,
"-a", address,
"-m", str(messages),
"--forever",
"--print-content=no",
"--receive-rate", str(opts.receive_rate),
"--report-total",
"--ack-frequency", str(opts.ack_frequency),
"--ready-address", ready_queue,
"--report-header=no"
]
command += opts.receive_arg
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
return clients.add(Popen(command, stdout=PIPE))
def start_send(queue, opts, broker, host):
address="%s;{%s}"%(queue,",".join(opts.send_option))
command = ["qpid-send",
"-b", broker,
"-a", address,
"--messages", str(opts.messages),
"--content-size", str(opts.content_size),
"--send-rate", str(opts.send_rate),
"--report-total",
"--report-header=no",
"--timestamp=%s"%(opts.timestamp and "yes" or "no"),
"--sequence=no",
"--flow-control", str(opts.flow_control),
"--durable", str(opts.durable)
]
command += opts.send_arg
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
return clients.add(Popen(command, stdout=PIPE))
def first_line(p):
out,err=p.communicate()
if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip()))
return out.split("\n")[0]
def delete_queues(queues, broker):
c = qpid.messaging.Connection(broker)
c.open()
for q in queues:
try:
s = c.session()
snd = s.sender("%s;{delete:always}"%(q))
snd.close()
s.sync()
except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
c.close()
def print_header(timestamp):
if timestamp: latency_header="\tl-min\tl-max\tl-avg"
else: latency_header=""
print "send-tp\t\trecv-tp%s"%latency_header
def parse(parser, lines): # Parse sender/receiver output
for l in lines:
fn_val = zip(parser, l)
return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
def parse_senders(senders):
return parse([int],[first_line(p) for p in senders])
def parse_receivers(receivers):
return parse([int,float,float,float],[first_line(p) for p in receivers if p])
def print_data(send_stats, recv_stats):
for send,recv in map(None, send_stats, recv_stats):
line=""
if send: line += "%d"%send[0]
if recv:
line += "\t\t%d"%recv[0]
if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
print line
def print_summary(send_stats, recv_stats):
def avg(s): sum(s) / len(s)
send_tp = sum([l[0] for l in send_stats])
recv_tp = sum([l[0] for l in recv_stats])
summary = "%d\t\t%d"%(send_tp, recv_tp)
if recv_stats and len(recv_stats[0]) == 4:
l_min = sum(l[1] for l in recv_stats)/len(recv_stats)
l_max = sum(l[2] for l in recv_stats)/len(recv_stats)
l_avg = sum(l[3] for l in recv_stats)/len(recv_stats)
summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg)
print summary
class ReadyReceiver:
"""A receiver for ready messages"""
def __init__(self, queue, broker):
delete_queues([queue], broker)
self.connection = qpid.messaging.Connection(broker)
self.connection.open()
self.receiver = self.connection.session().receiver(
"%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
self.receiver.session.sync()
self.timeout=10
def wait(self, receivers):
try:
for i in receivers: self.receiver.fetch(self.timeout)
self.connection.close()
except qpid.messaging.Empty:
for r in receivers:
if (r.poll() is not None):
out,err=r.communicate()
raise Exception("Receiver error: %s"%(out))
raise Exception("Timed out waiting for receivers to be ready")
def flatten(l):
return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), [])
class RoundRobin:
def __init__(self,items):
self.items = items
self.index = 0
def next(self):
if not self.items: return None
ret = self.items[self.index]
self.index = (self.index+1)%len(self.items)
return ret
def main():
opts, args = op.parse_args()
if not opts.broker: opts.broker = ["127.0.0.1"] # Deafult to local broker
opts.broker = flatten(opts.broker)
opts.client_host = flatten(opts.client_host)
brokers = RoundRobin(opts.broker)
client_hosts = RoundRobin(opts.client_host)
send_out = ""
receive_out = ""
ready_queue="%s-ready"%(opts.queue_name)
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
try:
for i in xrange(opts.repeat):
delete_queues(queues, opts.broker[0])
ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
for q in queues for j in xrange(opts.receivers)]
ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
senders = [start_send(q, opts,brokers.next(), client_hosts.next())
for q in queues for j in xrange(opts.senders)]
if opts.report_header and i == 0: print_header(opts.timestamp)
send_stats=parse_senders(senders)
recv_stats=parse_receivers(receivers)
if opts.summarize: print_summary(send_stats, recv_stats)
else: print_data(send_stats, recv_stats)
delete_queues(queues, opts.broker[0])
finally: clients.kill() # No strays
if __name__ == "__main__": main()