blob: 2e2583fdbe00169bff182b2abab55a7652656c3d [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import print_function
import sys, optparse, time
import logging
from proton import *
# Hi python3!
try:
long()
except:
long = int
usage = """
Usage: msgr-send [OPTIONS]
-a <addr>[,<addr>]* \tThe target address [amqp[s]://domain[/name]]
-c # \tNumber of messages to send before exiting [0=forever]
-b # \tSize of message body in bytes [1024]
-p # \tSend batches of # messages (wait for replies before sending next batch if -R) [1024]
-w # \t# outgoing window size [0]
-e # \t# seconds to report statistics, 0 = end of test [0]
-R \tWait for a reply to each sent message
-t # \tInactivity timeout in seconds, -1 = no timeout [-1]
-W # \tIncoming window size [0]
-B # \tArgument to Messenger::recv(n) [-1]
-N <name> \tSet the container name to <name>
-V \tEnable debug logging"""
def parse_options( argv ):
parser = optparse.OptionParser(usage=usage)
parser.add_option("-a", dest="targets", action="append", type="string")
parser.add_option("-c", dest="msg_count", type="int", default=0)
parser.add_option("-b", dest="msg_size", type="int", default=1024)
parser.add_option("-p", dest="send_batch", type="int", default=1024)
parser.add_option("-w", dest="outgoing_window", type="int")
parser.add_option("-e", dest="report_interval", type="int", default=0)
parser.add_option("-R", dest="get_replies", action="store_true")
parser.add_option("-t", dest="timeout", type="int", default=-1)
parser.add_option("-W", dest="incoming_window", type="int")
parser.add_option("-B", dest="recv_count", type="int", default=-1)
parser.add_option("-N", dest="name", type="string")
parser.add_option("-V", dest="verbose", action="store_true")
return parser.parse_args(args=argv)
class Statistics(object):
def __init__(self):
self.start_time = 0.0
self.latency_samples = 0
self.latency_total = 0.0
self.latency_min = None
self.latency_max = None
def start(self):
self.start_time = time.time()
def msg_received(self, msg):
ts = msg.creation_time
if ts:
l = long(time.time() * 1000) - ts
if l > 0.0:
self.latency_total += l
self.latency_samples += 1
if self.latency_samples == 1:
self.latency_min = self.latency_max = l
else:
if self.latency_min > l:
self.latency_min = l
if self.latency_max < l:
self.latency_max = l
def report(self, sent, received):
secs = time.time() - self.start_time
print("Messages sent: %d recv: %d" % (sent, received) )
print("Total time: %f sec" % secs )
if secs:
print("Throughput: %f msgs/sec" % (sent/secs) )
if self.latency_samples:
print("Latency (sec): %f min %f max %f avg" % (self.latency_min/1000.0,
self.latency_max/1000.0,
(self.latency_total/self.latency_samples)/1000.0))
def process_replies( messenger, message, stats, max_count, log):
"""
Return the # of reply messages received
"""
received = 0
log.debug("Calling pn_messenger_recv(%d)", max_count)
messenger.recv( max_count )
log.debug("Messages on incoming queue: %d", messenger.incoming)
while messenger.incoming > 0:
messenger.get( message )
received += 1
# TODO: header decoding?
stats.msg_received( message )
# uint64_t id = pn_message_get_correlation_id( message ).u.as_ulong;
return received
def main(argv=None):
opts = parse_options(argv)[0]
if opts.targets is None:
opts.targets = ["amqp://0.0.0.0"]
stats = Statistics()
sent = 0
received = 0
target_index = 0
log = logging.getLogger("msgr-send")
log.addHandler(logging.StreamHandler())
if opts.verbose:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
message = Message()
message.reply_to = "~"
message.body = "X" * opts.msg_size
reply_message = Message()
messenger = Messenger( opts.name )
if opts.outgoing_window is not None:
messenger.outgoing_window = opts.outgoing_window
if opts.timeout > 0:
opts.timeout *= 1000
messenger.timeout = opts.timeout
messenger.start()
# unpack targets that were specified using comma-separated list
#
targets = []
for x in opts.targets:
z = x.split(",")
for y in z:
if y:
targets.append(y)
stats.start()
while opts.msg_count == 0 or sent < opts.msg_count:
# send a message
message.address = targets[target_index]
if target_index == len(targets) - 1:
target_index = 0
else:
target_index += 1
message.correlation_id = sent
message.creation_time = long(time.time() * 1000)
messenger.put( message )
sent += 1
if opts.send_batch and (messenger.outgoing >= opts.send_batch):
if opts.get_replies:
while received < sent:
# this will also transmit any pending sent messages
received += process_replies( messenger, reply_message,
stats, opts.recv_count, log )
else:
log.debug("Calling pn_messenger_send()")
messenger.send()
log.debug("Messages received=%d sent=%d", received, sent)
if opts.get_replies:
# wait for the last of the replies
while received < sent:
count = process_replies( messenger, reply_message, stats,
opts.recv_count, log )
received += count
log.debug("Messages received=%d sent=%d", received, sent)
elif messenger.outgoing > 0:
log.debug("Calling pn_messenger_send()")
messenger.send()
messenger.stop()
stats.report( sent, received )
return 0
if __name__ == "__main__":
sys.exit(main())