blob: 9b3748ad86b850154ffeeacc53ab5bc828a8db11 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from proton.handlers import MessagingHandler
from proton.reactor import Container
from proton import Message, Endpoint
from system_test import main_module, TIMEOUT, TestTimeout
from system_test import unittest
class DrainMessagesHandler(MessagingHandler):
def __init__(self, address):
# prefetch is set to zero so that proton does not automatically issue 10 credits.
super(DrainMessagesHandler, self).__init__(prefetch=0)
self.conn = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.received_count = 0
self.address = address
self.error = "Unexpected Exit"
def timeout(self):
self.error = "Timeout Expired: sent: %d rcvd: %d" % (self.sent_count, self.received_count)
self.conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.address)
# Create a sender and a receiver. They are both listening on the same address
self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
self.sender = event.container.create_sender(self.conn, "org.apache.dev")
self.receiver.flow(1)
def on_link_flow(self, event):
if event.link.is_sender and event.link.credit \
and event.link.state & Endpoint.LOCAL_ACTIVE \
and event.link.state & Endpoint.REMOTE_ACTIVE :
self.on_sendable(event)
# The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
# messages. That along with 10 messages received indicates that the drain worked and we can
# declare that the test is successful
if self.received_count == 10 and event.link.credit == 0:
self.error = None
self.timer.cancel()
self.receiver.close()
self.sender.close()
self.conn.close()
def on_sendable(self, event):
if self.sent_count < 10:
msg = Message(body="Hello World", properties={'seq': self.sent_count})
dlv = event.sender.send(msg)
dlv.settle()
self.sent_count += 1
def on_message(self, event):
if event.receiver == self.receiver:
if "Hello World" == event.message.body:
self.received_count += 1
if self.received_count < 4:
event.receiver.flow(1)
elif self.received_count == 4:
# We are issuing a drain of 20. This means that we will receive all the 10 messages
# that the sender is sending. The router will also send back a response flow frame with
# drain=True but I don't have any way of making sure that the response frame reached the
# receiver
event.receiver.drain(20)
def run(self):
Container(self).run()
class DrainOneMessageHandler(DrainMessagesHandler):
def __init__(self, address):
super(DrainOneMessageHandler, self).__init__(address)
def on_message(self, event):
if event.receiver == self.receiver:
if "Hello World" == event.message.body:
self.received_count += 1
if self.received_count < 4:
event.receiver.flow(1)
elif self.received_count == 4:
# We are issuing a drain of 1 after we receive the 4th message.
# This means that going forward, we will receive only one more message.
event.receiver.drain(1)
# The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
# messages. That along with 5 messages received (4 earlier messages and 1 extra message for drain=1)
# indicates that the drain worked and we can declare that the test is successful
if self.received_count == 5 and event.link.credit == 0:
self.error = None
self.timer.cancel()
self.receiver.close()
self.sender.close()
self.conn.close()
class DrainNoMessagesHandler(MessagingHandler):
def __init__(self, address):
# prefetch is set to zero so that proton does not automatically issue 10 credits.
super(DrainNoMessagesHandler, self).__init__(prefetch=0)
self.conn = None
self.sender = None
self.receiver = None
self.address = address
self.error = "Unexpected Exit"
def timeout(self):
self.error = "Timeout Expired"
self.conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.address)
# Create a sender and a receiver. They are both listening on the same address
self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
self.sender = event.container.create_sender(self.conn, "org.apache.dev")
self.receiver.flow(1)
def on_sendable(self, event):
self.receiver.drain(1)
def on_link_flow(self, event):
if self.receiver.credit == 0:
self.error = None
self.timer.cancel()
self.conn.close()
def run(self):
Container(self).run()
class DrainNoMoreMessagesHandler(MessagingHandler):
def __init__(self, address):
# prefetch is set to zero so that proton does not automatically issue 10 credits.
super(DrainNoMoreMessagesHandler, self).__init__(prefetch=0)
self.conn = None
self.sender = None
self.receiver = None
self.address = address
self.sent = 0
self.rcvd = 0
self.error = "Unexpected Exit"
def timeout(self):
self.error = "Timeout Expired: sent=%d rcvd=%d" % (self.sent, self.rcvd)
self.conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.address)
# Create a sender and a receiver. They are both listening on the same address
self.receiver = event.container.create_receiver(self.conn, "org.apache.dev")
self.sender = event.container.create_sender(self.conn, "org.apache.dev")
self.receiver.flow(1)
def on_sendable(self, event):
if self.sent == 0:
msg = Message(body="Hello World")
event.sender.send(msg)
self.sent += 1
def on_message(self, event):
self.rcvd += 1
def on_settled(self, event):
self.receiver.drain(1)
def on_link_flow(self, event):
if self.receiver.credit == 0:
self.error = None
self.timer.cancel()
self.conn.close()
def run(self):
Container(self).run()
class DrainMessagesMoreHandler(MessagingHandler):
"""
Make sure the clients can send/receive after going through a drain cycle.
Send phase
1. Sender sending first 10 messages
2. Sender paused waiting for drain to finish
3. Sender is sending second 10 messages
4. Sender is done.
Receive phase
1. Receiver receiving first four messages; At #4 receiver issues drain 4,20
2. Reciever receives messages 5..10.
When 10 messages have been received and link credit =0 the drain is done
Receiver issues 10 credits
3. Receiver recieves messages 11..20.
4. Receiver is done
At issue in DISPATCH-1055 is that the 10 credits issued in Receive step 2
are never propagated across a link route to the 'broker'.
This code is instantiated with and without the link route to demonstrate that
it works properly when the 'test-router' is handling the drain by itself
and that it fails only on the link route.
"""
def __init__(self, address, route_name):
# prefetch is set to zero so that proton does not automatically issue 10 credits.
super(DrainMessagesMoreHandler, self).__init__(prefetch=0)
self.conn = None
self.sender = None
self.receiver = None
self.sent_count = 0
self.received_count = 0
self.address = address
self.error = "Unexpected Exit"
self.send_phase = 1
self.recv_phase = 1
self.route_name = route_name
self.verbose_printing = False
def show_state(self):
return str("send_phase:" + str(self.send_phase)
+ ", sent_count:" + str(self.sent_count)
+ ", recv_phase:" + str(self.recv_phase)
+ ", receive_count:" + str(self.received_count)
+ ", receiver_credit:" + str(self.receiver.credit)
+ ", sender_credit:" + str(self.sender.credit))
def printme(self, str):
if (self.verbose_printing):
print(str + " " + self.show_state())
def timeout(self):
self.error = "Timeout Expired: sent: %d rcvd: %d" % (self.sent_count, self.received_count)
self.conn.close()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
self.conn = event.container.connect(self.address)
# Create a sender and a receiver. They are both listening on the same address
self.receiver = event.container.create_receiver(self.conn, source=self.route_name)
self.sender = event.container.create_sender(self.conn, target=self.route_name)
self.receiver.flow(1)
def on_link_flow(self, event):
if event.link.is_sender and event.link.credit \
and event.link.state & Endpoint.LOCAL_ACTIVE \
and event.link.state & Endpoint.REMOTE_ACTIVE :
self.on_sendable(event)
# The fact that the event.link.credit is 0 means that the receiver will not be receiving any more
# messages. That along with 10 messages received indicates that the drain worked.
if self.send_phase == 2 and self.received_count == 10 and event.link.credit == 0:
self.printme("sender transitions to phase 3 - drain completed, send new flow now")
self.receiver.flow(10)
self.send_phase = 3
if event.link.is_sender and event.link.credit \
and event.link.state & Endpoint.LOCAL_ACTIVE \
and event.link.state & Endpoint.REMOTE_ACTIVE :
self.on_sendable(event)
self.printme(("sender " if event.link.is_sender else "receiver ") + "exit on_link_flow:")
def on_sendable(self, event):
if event.link.is_sender and self.send_phase == 1 and self.sent_count < 10:
msg = Message(body="Hello World", properties={'seq': self.sent_count})
dlv = event.sender.send(msg)
dlv.settle()
self.sent_count += 1
if self.sent_count == 10:
self.printme("sender transitions to phase 2 - wait for drain to finish")
self.send_phase = 2
elif event.link.is_sender and self.send_phase == 3 and self.sent_count < 20:
msg = Message(body="Hello World", properties={'seq': self.sent_count})
dlv = event.sender.send(msg)
dlv.settle()
self.sent_count += 1
if self.sent_count == 20:
self.printme("sender transitions to phase 4 - done sending")
self.send_phase = 4
self.printme(("sender " if event.link.is_sender else "receiver ") + "exit on_sendable:")
def on_message(self, event):
if event.receiver == self.receiver:
if "Hello World" == event.message.body:
self.received_count += 1
if self.recv_phase == 1 and self.received_count < 4:
event.receiver.flow(1)
elif self.recv_phase == 1 and self.received_count == 4:
# We are issuing a drain of 20. This means that we will receive all the 10 messages
# that the sender is sending. The router will also send back a response flow frame with
# drain=True but I don't have any way of making sure that the response frame reached the
# receiver
self.printme("receiver transitions to phase 2 - sending drain now")
event.receiver.drain(20)
self.recv_phase = 2
elif self.recv_phase == 2 and self.received_count == 10:
self.printme("receiver transitions to phase 3")
self.recv_phase = 3
msg = Message(body="Hello World", properties={'seq': self.sent_count})
dlv = self.sender.send(msg)
dlv.settle()
self.sent_count += 1
elif self.recv_phase == 3 and self.received_count == 20:
self.printme("receiver transitions to phase 4 - test is completed successfully")
self.recv_phase = 4
self.error = None
self.timer.cancel()
self.receiver.close()
self.sender.close()
self.conn.close()
self.printme("exit on_message:")
def run(self):
Container(self).run()
if __name__ == '__main__':
unittest.main(main_module())