blob: 757b19c20aeb128e102ce4ad3bd7a1aa602324d9 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import print_function
import sys, optparse, time
import logging
from proton import *
# Hi python3!
try:
long()
except:
long = int
usage = """
Usage: msgr-recv [OPTIONS]
-a <addr>[,<addr>]* \tAddresses to listen on [amqp://~0.0.0.0]
-c # \tNumber of messages to receive before exiting [0=forever]
-b # \tArgument to Messenger::recv(n) [2048]
-w # \tSize for incoming window [0]
-t # \tInactivity timeout in seconds, -1 = no timeout [-1]
-e # \t# seconds to report statistics, 0 = end of test [0] *TBD*
-R \tSend reply if 'reply-to' present
-W # \t# outgoing window size [0]
-F <addr>[,<addr>]* \tAddresses used for forwarding received messages
-N <name> \tSet the container name to <name>
-X <text> \tPrint '<text>\\n' to stdout after all subscriptions are created
-V \tEnable debug logging"""
def parse_options( argv ):
parser = optparse.OptionParser(usage=usage)
parser.add_option("-a", dest="subscriptions", action="append", type="string")
parser.add_option("-c", dest="msg_count", type="int", default=0)
parser.add_option("-b", dest="recv_count", type="int", default=-1)
parser.add_option("-w", dest="incoming_window", type="int")
parser.add_option("-t", dest="timeout", type="int", default=-1)
parser.add_option("-e", dest="report_interval", type="int", default=0)
parser.add_option("-R", dest="reply", action="store_true")
parser.add_option("-W", dest="outgoing_window", type="int")
parser.add_option("-F", dest="forwarding_targets", action="append", type="string")
parser.add_option("-N", dest="name", type="string")
parser.add_option("-X", dest="ready_text", type="string")
parser.add_option("-V", dest="verbose", action="store_true")
return parser.parse_args(args=argv)
class Statistics(object):
def __init__(self):
self.start_time = 0.0
self.latency_samples = 0
self.latency_total = 0.0
self.latency_min = None
self.latency_max = None
def start(self):
self.start_time = time.time()
def msg_received(self, msg):
ts = msg.creation_time
if ts:
l = long(time.time() * 1000) - ts
if l > 0.0:
self.latency_total += l
self.latency_samples += 1
if self.latency_samples == 1:
self.latency_min = self.latency_max = l
else:
if self.latency_min > l:
self.latency_min = l
if self.latency_max < l:
self.latency_max = l
def report(self, sent, received):
secs = time.time() - self.start_time
print("Messages sent: %d recv: %d" % (sent, received) )
print("Total time: %f sec" % secs )
if secs:
print("Throughput: %f msgs/sec" % (sent/secs) )
if self.latency_samples:
print("Latency (sec): %f min %f max %f avg" % (self.latency_min/1000.0,
self.latency_max/1000.0,
(self.latency_total/self.latency_samples)/1000.0))
def main(argv=None):
opts = parse_options(argv)[0]
if opts.subscriptions is None:
opts.subscriptions = ["amqp://~0.0.0.0"]
stats = Statistics()
sent = 0
received = 0
forwarding_index = 0
log = logging.getLogger("msgr-recv")
log.addHandler(logging.StreamHandler())
if opts.verbose:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
message = Message()
messenger = Messenger( opts.name )
if opts.incoming_window is not None:
messenger.incoming_window = opts.incoming_window
if opts.timeout > 0:
opts.timeout *= 1000
messenger.timeout = opts.timeout
messenger.start()
# unpack addresses that were specified using comma-separated list
for x in opts.subscriptions:
z = x.split(",")
for y in z:
if y:
log.debug("Subscribing to %s", y)
messenger.subscribe(y)
forwarding_targets = []
if opts.forwarding_targets:
for x in opts.forwarding_targets:
z = x.split(",")
for y in z:
if y:
forwarding_targets.append(y)
# hack to let test scripts know when the receivers are ready (so that the
# senders may be started)
if opts.ready_text:
print("%s" % opts.ready_text)
sys.stdout.flush()
while opts.msg_count == 0 or received < opts.msg_count:
log.debug("Calling pn_messenger_recv(%d)", opts.recv_count)
rc = messenger.recv(opts.recv_count)
# start the timer only after receiving the first msg
if received == 0:
stats.start()
log.debug("Messages on incoming queue: %d", messenger.incoming)
while messenger.incoming > 0:
messenger.get(message)
received += 1
# TODO: header decoding?
# uint64_t id = pn_message_get_correlation_id( message ).u.as_ulong;
stats.msg_received( message )
if opts.reply:
if message.reply_to:
log.debug("Replying to: %s", message.reply_to )
message.address = message.reply_to
message.creation_time = long(time.time() * 1000)
messenger.put( message )
sent += 1
if forwarding_targets:
forward_addr = forwarding_targets[forwarding_index]
forwarding_index += 1
if forwarding_index == len(forwarding_targets):
forwarding_index = 0
log.debug("Forwarding to: %s", forward_addr )
message.address = forward_addr
message.reply_to = None
message.creation_time = long(time.time() * 1000)
messenger.put( message )
sent += 1
log.debug("Messages received=%lu sent=%lu", received, sent)
# this will flush any pending sends
if messenger.outgoing > 0:
log.debug("Calling pn_messenger_send()")
messenger.send()
messenger.stop()
stats.report( sent, received )
return 0
if __name__ == "__main__":
sys.exit(main())