blob: aa3a23a34be034bf15d9091b0c5c0e94bd7cbb88 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from proton.handlers import MessagingHandler
from proton.reactor import Container
from proton import Message
class Timeout(object):
def __init__(self, parent):
self.parent = parent
def on_timer_task(self, event):
self.parent.timeout()
class DrainMessagesHandler(MessagingHandler):
def __init__(self, address):
# prefetch is set to zero so that proton does not automatically issue 10 credits.
super(DrainMessagesHandler, self).__init__(prefetch=0)
self.conn = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.received_count = 0
self.address = address
self.error = "Unexpected Exit"
def timeout(self):
self.error = "Timeout Expired"
self.conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(5, Timeout(self))
self.conn = event.container.connect(self.address)
# Create a sender and a receiver. They are both listening on the same address
self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
self.sender = event.container.create_sender(self.conn, "org.apache.dev")
self.receiver.flow(1)
def on_sendable(self, event):
if self.sent_count < 10:
msg = Message(body="Hello World")
dlv = event.sender.send(msg)
dlv.settle()
self.sent_count += 1
def on_message(self, event):
if event.receiver == self.receiver:
if "Hello World" == event.message.body:
self.received_count += 1
if self.received_count < 4:
event.receiver.flow(1)
elif self.received_count == 4:
# We are issuing a drain of 20. This means that we will receive all the 10 messages
# that the sender is sending. The router will also send back a response flow frame with
# drain=True but I don't have any way of making sure that the response frame reached the
# receiver
event.receiver.drain(20)
# The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
# messages. That along with 10 messages received indicates that the drain worked and we can
# declare that the test is successful
if self.received_count == 10 and event.link.credit == 0:
self.error = None
self.timer.cancel()
self.receiver.close()
self.sender.close()
self.conn.close()
def run(self):
Container(self).run()
class DrainOneMessageHandler(DrainMessagesHandler):
def __init__(self, address):
super(DrainOneMessageHandler, self).__init__(address)
def on_message(self, event):
if event.receiver == self.receiver:
if "Hello World" == event.message.body:
self.received_count += 1
if self.received_count < 4:
event.receiver.flow(1)
elif self.received_count == 4:
# We are issuing a drain of 1 after we receive the 4th message.
# This means that going forward, we will receive only one more message.
event.receiver.drain(1)
# The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
# messages. That along with 5 messages received (4 earlier messages and 1 extra message for drain=1)
# indicates that the drain worked and we can declare that the test is successful
if self.received_count == 5 and event.link.credit == 0:
self.error = None
self.timer.cancel()
self.receiver.close()
self.sender.close()
self.conn.close()
class DrainNoMessagesHandler(MessagingHandler):
def __init__(self, address):
# prefetch is set to zero so that proton does not automatically issue 10 credits.
super(DrainNoMessagesHandler, self).__init__(prefetch=0)
self.conn = None
self.sender = None
self.receiver = None
self.address = address
self.error = "Unexpected Exit"
def timeout(self):
self.error = "Timeout Expired"
self.conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(5, Timeout(self))
self.conn = event.container.connect(self.address)
# Create a sender and a receiver. They are both listening on the same address
self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
self.sender = event.container.create_sender(self.conn, "org.apache.dev")
self.receiver.flow(1)
def on_sendable(self, event):
self.receiver.drain(1)
def on_link_flow(self, event):
if self.receiver.credit == 0:
self.error = None
self.timer.cancel()
self.conn.close()
def run(self):
Container(self).run()
class DrainNoMoreMessagesHandler(MessagingHandler):
def __init__(self, address):
# prefetch is set to zero so that proton does not automatically issue 10 credits.
super(DrainNoMoreMessagesHandler, self).__init__(prefetch=0)
self.conn = None
self.sender = None
self.receiver = None
self.address = address
self.sent = 0
self.rcvd = 0
self.error = "Unexpected Exit"
def timeout(self):
self.error = "Timeout Expired: sent=%d rcvd=%d" % (self.sent, self.rcvd)
self.conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(5, Timeout(self))
self.conn = event.container.connect(self.address)
# Create a sender and a receiver. They are both listening on the same address
self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
self.sender = event.container.create_sender(self.conn, "org.apache.dev")
self.receiver.flow(1)
def on_sendable(self, event):
if self.sent == 0:
msg = Message(body="Hello World")
event.sender.send(msg)
self.sent += 1
def on_message(self, event):
self.rcvd += 1
def on_settled(self, event):
self.receiver.drain(1)
def on_link_flow(self, event):
if self.receiver.credit == 0:
self.error = None
self.timer.cancel()
self.conn.close()
def run(self):
Container(self).run()