| #!/usr/bin/env python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import optparse, time, qpid.messaging, re |
| from threading import Thread |
| from subprocess import Popen, PIPE, STDOUT |
| |
| op = optparse.OptionParser(usage="usage: %prog [options]", |
| description="simple performance benchmarks") |
| op.add_option("-b", "--broker", default=[], action="append", type="str", |
| help="url of broker(s) to connect to, round robin on multiple brokers") |
| op.add_option("-c", "--client-host", default=[], action="append", type="str", |
| help="host(s) to run clients on via ssh, round robin on mulple hosts") |
| op.add_option("-q", "--queues", default=1, type="int", metavar="N", |
| help="create N queues (default %default)") |
| op.add_option("-s", "--senders", default=1, type="int", metavar="N", |
| help="start N senders per queue (default %default)") |
| op.add_option("-r", "--receivers", default=1, type="int", metavar="N", |
| help="start N receivers per queue (default %default)") |
| op.add_option("-m", "--messages", default=100000, type="int", metavar="N", |
| help="send N messages per sender (default %default)") |
| op.add_option("--queue-name", default="benchmark", metavar="NAME", |
| help="base name for queues (default %default)") |
| op.add_option("--send-rate", default=0, metavar="N", |
| help="send rate limited to N messages/second, 0 means no limit (default %default)") |
| op.add_option("--receive-rate", default=0, metavar="N", |
| help="receive rate limited to N messages/second, 0 means no limit (default %default)") |
| op.add_option("--content-size", default=1024, type="int", metavar="BYTES", |
| help="message size in bytes (default %default)") |
| op.add_option("--ack-frequency", default=100, metavar="N", type="int", |
| help="receiver ack's every N messages, 0 means unconfirmed (default %default)") |
| op.add_option("--no-report-header", dest="report_header", default=True, |
| action="store_false", help="don't print header on report") |
| op.add_option("--summarize", default=False, action="store_true", |
| help="print summary statistics for multiple senders/receivers: total throughput, average latency") |
| op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int") |
| op.add_option("--send-option", default=[], action="append", type="str", |
| help="Additional option for sending addresses") |
| op.add_option("--receive-option", default=[], action="append", type="str", |
| help="Additional option for receiving addresses") |
| op.add_option("--send-arg", default=[], action="append", type="str", |
| help="Additional argument for qpid-send") |
| op.add_option("--receive-arg", default=[], action="append", type="str", |
| help="Additional argument for qpid-receive") |
| op.add_option("--no-timestamp", dest="timestamp", default=True, |
| action="store_false", help="don't add a timestamp, no latency results") |
| op.add_option("--connection-options", type="str", |
| help="Connection options for senders & receivers") |
| op.add_option("--flow-control", default=0, type="int", metavar="N", |
| help="Flow control each sender to limit queue depth to 2*N. 0 means no flow control.") |
| op.add_option("--durable", default=False, action="store_true", |
| help="Use durable queues and messages") |
| |
| single_quote_re = re.compile("'") |
| def posix_quote(string): |
| """ Quote a string for use as an argument in a posix shell""" |
| return "'" + single_quote_re.sub("\\'", string) + "'"; |
| |
| def ssh_command(host, command): |
| """Convert command into an ssh command on host with quoting""" |
| return ["ssh", host] + [posix_quote(arg) for arg in command] |
| |
| class Clients: |
| def __init__(self): self.clients=[] |
| |
| def add(self, client): |
| self.clients.append(client) |
| return client |
| |
| def kill(self): |
| for c in self.clients: |
| try: c.kill() |
| except: pass |
| |
| clients = Clients() |
| |
| def start_receive(queue, index, opts, ready_queue, broker, host): |
| address_opts=["create:receiver"] + opts.receive_option |
| if opts.durable: address_opts += ["node:{durable:true}"] |
| address="%s;{%s}"%(queue,",".join(address_opts)) |
| msg_total=opts.senders*opts.messages |
| messages = msg_total/opts.receivers; |
| if (index < msg_total%opts.receivers): messages += 1 |
| if (messages == 0): return None |
| command = ["qpid-receive", |
| "-b", broker, |
| "-a", address, |
| "-m", str(messages), |
| "--forever", |
| "--print-content=no", |
| "--receive-rate", str(opts.receive_rate), |
| "--report-total", |
| "--ack-frequency", str(opts.ack_frequency), |
| "--ready-address", ready_queue, |
| "--report-header=no" |
| ] |
| command += opts.receive_arg |
| if opts.connection_options: |
| command += ["--connection-options",opts.connection_options] |
| if host: command = ssh_command(host, command) |
| return clients.add(Popen(command, stdout=PIPE)) |
| |
| def start_send(queue, opts, broker, host): |
| address="%s;{%s}"%(queue,",".join(opts.send_option)) |
| command = ["qpid-send", |
| "-b", broker, |
| "-a", address, |
| "--messages", str(opts.messages), |
| "--content-size", str(opts.content_size), |
| "--send-rate", str(opts.send_rate), |
| "--report-total", |
| "--report-header=no", |
| "--timestamp=%s"%(opts.timestamp and "yes" or "no"), |
| "--sequence=no", |
| "--flow-control", str(opts.flow_control), |
| "--durable", str(opts.durable) |
| ] |
| command += opts.send_arg |
| if opts.connection_options: |
| command += ["--connection-options",opts.connection_options] |
| if host: command = ssh_command(host, command) |
| return clients.add(Popen(command, stdout=PIPE)) |
| |
| def first_line(p): |
| out,err=p.communicate() |
| if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip())) |
| return out.split("\n")[0] |
| |
| def delete_queues(queues, broker): |
| c = qpid.messaging.Connection(broker) |
| c.open() |
| for q in queues: |
| try: |
| s = c.session() |
| snd = s.sender("%s;{delete:always}"%(q)) |
| snd.close() |
| s.sync() |
| except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue" |
| c.close() |
| |
| def print_header(timestamp): |
| if timestamp: latency_header="\tl-min\tl-max\tl-avg" |
| else: latency_header="" |
| print "send-tp\t\trecv-tp%s"%latency_header |
| |
| def parse(parser, lines): # Parse sender/receiver output |
| for l in lines: |
| fn_val = zip(parser, l) |
| return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] |
| |
| def parse_senders(senders): |
| return parse([int],[first_line(p) for p in senders]) |
| |
| def parse_receivers(receivers): |
| return parse([int,float,float,float],[first_line(p) for p in receivers if p]) |
| |
| def print_data(send_stats, recv_stats): |
| for send,recv in map(None, send_stats, recv_stats): |
| line="" |
| if send: line += "%d"%send[0] |
| if recv: |
| line += "\t\t%d"%recv[0] |
| if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]) |
| print line |
| |
| def print_summary(send_stats, recv_stats): |
| def avg(s): sum(s) / len(s) |
| send_tp = sum([l[0] for l in send_stats]) |
| recv_tp = sum([l[0] for l in recv_stats]) |
| summary = "%d\t\t%d"%(send_tp, recv_tp) |
| if recv_stats and len(recv_stats[0]) == 4: |
| l_min = sum(l[1] for l in recv_stats)/len(recv_stats) |
| l_max = sum(l[2] for l in recv_stats)/len(recv_stats) |
| l_avg = sum(l[3] for l in recv_stats)/len(recv_stats) |
| summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg) |
| print summary |
| |
| |
| class ReadyReceiver: |
| """A receiver for ready messages""" |
| def __init__(self, queue, broker): |
| delete_queues([queue], broker) |
| self.connection = qpid.messaging.Connection(broker) |
| self.connection.open() |
| self.receiver = self.connection.session().receiver( |
| "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue)) |
| self.receiver.session.sync() |
| self.timeout=10 |
| |
| def wait(self, receivers): |
| try: |
| for i in receivers: self.receiver.fetch(self.timeout) |
| self.connection.close() |
| except qpid.messaging.Empty: |
| for r in receivers: |
| if (r.poll() is not None): |
| out,err=r.communicate() |
| raise Exception("Receiver error: %s"%(out)) |
| raise Exception("Timed out waiting for receivers to be ready") |
| |
| def flatten(l): |
| return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), []) |
| |
| class RoundRobin: |
| def __init__(self,items): |
| self.items = items |
| self.index = 0 |
| |
| def next(self): |
| if not self.items: return None |
| ret = self.items[self.index] |
| self.index = (self.index+1)%len(self.items) |
| return ret |
| |
| def main(): |
| opts, args = op.parse_args() |
| if not opts.broker: opts.broker = ["127.0.0.1"] # Deafult to local broker |
| opts.broker = flatten(opts.broker) |
| opts.client_host = flatten(opts.client_host) |
| brokers = RoundRobin(opts.broker) |
| client_hosts = RoundRobin(opts.client_host) |
| send_out = "" |
| receive_out = "" |
| ready_queue="%s-ready"%(opts.queue_name) |
| queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] |
| try: |
| for i in xrange(opts.repeat): |
| delete_queues(queues, opts.broker[0]) |
| ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) |
| receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) |
| for q in queues for j in xrange(opts.receivers)] |
| ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. |
| senders = [start_send(q, opts,brokers.next(), client_hosts.next()) |
| for q in queues for j in xrange(opts.senders)] |
| if opts.report_header and i == 0: print_header(opts.timestamp) |
| send_stats=parse_senders(senders) |
| recv_stats=parse_receivers(receivers) |
| if opts.summarize: print_summary(send_stats, recv_stats) |
| else: print_data(send_stats, recv_stats) |
| delete_queues(queues, opts.broker[0]) |
| finally: clients.kill() # No strays |
| |
| if __name__ == "__main__": main() |
| |