| #!/usr/bin/env python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import optparse, random, os, time, uuid |
| from qpid.messaging import * |
| import statistics |
| |
| EOS = "eos" |
| SN = "sn" |
| TS = "ts" |
| |
| TIME_SEC = 1000000000 |
| SECOND = 1000 |
| |
| def nameval(st): |
| idx = st.find("=") |
| if idx >= 0: |
| name = st[0:idx] |
| value = st[idx+1:] |
| else: |
| name = st |
| value = None |
| return name, value |
| |
| |
| op = optparse.OptionParser(usage="usage: %prog [options]", description="Spouts messages to the specified address") |
| op.add_option("-b", "--broker", default="localhost:5672", type="str", help="url of broker to connect to") |
| op.add_option("-a", "--address", type="str", help="address to send to") |
| op.add_option("--connection-options", default={}, help="options for the connection") |
| op.add_option("-m", "--messages", default=1, type="int", help="stop after N messages have been sent, 0 means no limit") |
| op.add_option("-i", "--id", type="str", help="use the supplied id instead of generating one") |
| op.add_option("--reply-to", type="str", help="specify reply-to address") |
| op.add_option("--send-eos", default=0, type="int", help="Send N EOS messages to mark end of input") |
| op.add_option("--durable", default=False, action="store_true", help="Mark messages as durable") |
| op.add_option("--ttl", default=0, type="int", help="Time-to-live for messages, in milliseconds") |
| op.add_option("--priority", default=0, type="int", help="Priority for messages (higher value implies higher priority)") |
| op.add_option("-P", "--property", default=[], action="append", type="str", help="specify message property") |
| op.add_option("--correlation-id", type="str", help="correlation-id for message") |
| op.add_option("--user-id", type="str", help="userid for message") |
| op.add_option("--content-string", type="str", help="use CONTENT as message content") |
| op.add_option("--content-size", default=0, type="int", help="create an N-byte message content") |
| op.add_option("-M", "--content-map", default=[], action="append", type="str", help="specify entry for map content") |
| op.add_option("--content-stdin", default=False, action="store_true", help="read message content from stdin, one line per message") |
| op.add_option("--capacity", default=1000, type="int", help="size of the senders outgoing message queue") |
| op.add_option("--tx", default=0, type="int", help="batch size for transactions (0 implies transaction are not used)") |
| op.add_option("--rollback-frequency", default=0, type="int", help="rollback frequency (0 implies no transaction will be rolledback)") |
| op.add_option("--failover-updates", default=False, action="store_true", help="Listen for membership updates distributed via amq.failover") |
| op.add_option("--report-total", default=False, action="store_true", help="Report total throughput statistics") |
| op.add_option("--report-every", default=0, type="int", help="Report throughput statistics every N messages") |
| op.add_option("--report-header", type="str", default="yes", help="Headers on report") |
| op.add_option("--send-rate", default=0, type="int", help="Send at rate of N messages/second. 0 means send as fast as possible") |
| op.add_option("--flow-control", default=0, type="int", help="Do end to end flow control to limit queue depth to 2*N. 0 means no flow control.") |
| op.add_option("--sequence", type="str", default="yes", help="Add a sequence number messages property (required for duplicate/lost message detection)") |
| op.add_option("--timestamp", type="str", default="yes", help="Add a time stamp messages property (required for latency measurement)") |
| op.add_option("--group-key", type="str", help="Generate groups of messages using message header 'KEY' to hold the group identifier") |
| op.add_option("--group-prefix", default="GROUP-", type="str", help="Generate group identifers with 'STRING' prefix (if group-key specified)") |
| op.add_option("--group-size", default=10, type="int", help="Number of messages per a group (if group-key specified)") |
| op.add_option("--group-randomize-size", default=False, action="store_true", help="Randomize the number of messages per group to [1...group-size] (if group-key specified)") |
| op.add_option("--group-interleave", default=1, type="int", help="Simultaineously interleave messages from N different groups (if group-key specified)") |
| |
| |
| class ContentGenerator: |
| def setContent(self, msg): |
| return |
| |
| class GetlineContentGenerator(ContentGenerator): |
| def setContent(self, msg): |
| content = sys.stdin.readline() |
| got = (not line) |
| if (got): |
| msg.content = content |
| return got |
| |
| class FixedContentGenerator(ContentGenerator): |
| def __init__(self, content=None): |
| self.content = content |
| |
| def setContent(self, msg): |
| msg.content = self.content |
| return True |
| |
| class MapContentGenerator(ContentGenerator): |
| def __init__(self, opts=None): |
| self.opts = opts |
| |
| def setContent(self, msg): |
| self.content = {} |
| for e in self.opts.content_map: |
| name, val = nameval(p) |
| content[name] = val |
| msg.content = self.content |
| return True |
| |
| |
| # tag each generated message with a group identifer |
| class GroupGenerator: |
| def __init__(self, key, prefix, size, randomize, interleave): |
| groupKey = key |
| groupPrefix = prefix |
| groupSize = size |
| randomizeSize = randomize |
| groupSuffix = 0 |
| if (randomize > 0): |
| random.seed(os.getpid()) |
| |
| for i in range(0, interleave): |
| self.newGroup() |
| current = 0 |
| |
| def setGroupInfo(self, msg): |
| if (current == len(groups)): |
| current = 0 |
| my_group = groups[current] |
| msg.properties[groupKey] = my_group[id]; |
| # print "SENDING GROUPID=[%s]\n" % my_group[id] |
| my_group[count]=my_group[count]+1 |
| if (my_group[count] == my_group[size]): |
| self.newGroup() |
| del groups[current] |
| else: |
| current+=1 |
| |
| def newGroup(self): |
| groupId = "%s%s" % (groupPrefix, groupSuffix) |
| groupSuffix+=1 |
| size = groupSize |
| if (randomizeSize == True): |
| size = random.randint(1,groupSize) |
| # print "New group: GROUPID=["%s] size=%s" % (groupId, size) |
| groups.append({'id':groupId, 'size':size, 'count':0}) |
| |
| |
| |
| def main(): |
| opts, args = op.parse_args() |
| if not opts.address: |
| raise Exception("Address must be specified!") |
| |
| broker = opts.broker |
| address = opts.address |
| connection = Connection(opts.broker, **opts.connection_options) |
| |
| try: |
| connection.open() |
| if (opts.failover_updates): |
| auto_fetch_reconnect_urls(connection) |
| session = connection.session(transactional=(opts.tx)) |
| sender = session.sender(opts.address) |
| if (opts.capacity>0): |
| sender.capacity = opts.capacity |
| sent = 0 |
| txCount = 0 |
| stats = statistics.Throughput() |
| reporter = statistics.Reporter(opts.report_every, opts.report_header == "yes", stats) |
| |
| contentGen = ContentGenerator() |
| content = "" # auxiliary variable for determining content type of message - needs to be changed to {} for Map message |
| if opts.content_stdin: |
| opts.messages = 0 # Don't limit number of messages sent. |
| contentGen = GetlineContentGenerator() |
| elif opts.content_map is not None: |
| contentGen = MapContentGenerator(opts) |
| content = {} |
| elif opts.content_size is not None: |
| contentGen = FixedContentGenerator('X' * opts.content_size) |
| else: |
| contentGen = FixedContentGenerator(opts.content_string) |
| if opts.group_key is not None: |
| groupGen = GroupGenerator(opts.group_key, opts.group_prefix, opts.group_size, opts.group_random_size, opts.group_interleave) |
| |
| msg = Message(content=content) |
| msg.durable = opts.durable |
| if opts.ttl: |
| msg.ttl = opts.ttl/1000.0 |
| if opts.priority: |
| msg.priority = opts.priority |
| if opts.reply_to is not None: |
| if opts.flow_control > 0: |
| raise Exception("Can't use reply-to and flow-control together") |
| msg.reply_to = opts.reply_to |
| if opts.user_id is not None: |
| msg.user_id = opts.user_id |
| if opts.correlation_id is not None: |
| msg.correlation_id = opts.correlation_id |
| for p in opts.property: |
| name, val = nameval(p) |
| msg.properties[name] = val |
| |
| start = time.time()*TIME_SEC |
| interval = 0 |
| if opts.send_rate > 0: |
| interval = TIME_SEC/opts.send_rate |
| |
| flowControlAddress = "flow-" + str(uuid.uuid1()) + ";{create:always,delete:always}" |
| flowSent = 0 |
| if opts.flow_control > 0: |
| flowControlReceiver = session.receiver(flowControlAddress) |
| flowControlReceiver.capacity = 2 |
| |
| while (contentGen.setContent(msg) == True): |
| sent+=1 |
| if opts.sequence == "yes": |
| msg.properties[SN] = sent |
| |
| if opts.flow_control > 0: |
| if (sent % opts.flow_control == 0): |
| msg.reply_to = flowControlAddress |
| flowSent+=1 |
| else: |
| msg.reply_to = "" # Clear the reply address. |
| |
| if 'groupGen' in vars(): |
| groupGen.setGroupInfo(msg) |
| |
| if (opts.timestamp == "yes"): |
| msg.properties[TS] = int(time.time()*TIME_SEC) |
| sender.send(msg) |
| reporter.message(msg) |
| |
| if ((opts.tx > 0) and (sent % opts.tx == 0)): |
| txCount+=1 |
| if ((opts.rollbackFrequency > 0) and (txCount % opts.rollbackFrequency == 0)): |
| session.rollback() |
| else: |
| session.commit() |
| if ((opts.messages > 0) and (sent >= opts.messages)): |
| break |
| |
| if (opts.flow_control > 0) and (flowSent == 2): |
| flowControlReceiver.fetch(timeout=SECOND) |
| flowSent -= 1 |
| |
| if (opts.send_rate > 0): |
| delay = start + sent*interval - time.time()*TIME_SEC |
| if (delay > 0): |
| time.sleep(delay) |
| #end of while |
| |
| while flowSent > 0: |
| flowControlReceiver.fetch(timeout=SECOND) |
| flowSent -= 1 |
| |
| if (opts.report_total): |
| reporter.report() |
| for i in reversed(range(1,opts.send_eos+1)): |
| if (opts.sequence == "yes"): |
| sent+=1 |
| msg.properties[SN] = sent |
| msg.properties[EOS] = True #TODO (also in C++ client): add in ability to send digest or similar |
| sender.send(msg) |
| if ((opts.tx > 0) and (sent % opts.tx == 0)): |
| txCount+=1 |
| if ((opts.rollback_frequency > 0) and (txCount % opts.rollback_frequency == 0)): |
| session.rollback() |
| else: |
| session.commit() |
| session.sync() |
| session.close() |
| connection.close() |
| except Exception,e: |
| print e |
| connection.close() |
| |
| if __name__ == "__main__": main() |