blob: 4226bc1691f56a5eb77236baa1cf5e8152d1440e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
import json
from proton import Message, symbol
from system_test import TestCase, Qdrouterd, Process, TIMEOUT
from system_test import unittest
from subprocess import PIPE, STDOUT
from proton.handlers import MessagingHandler
from proton.reactor import Container
from qpid_dispatch_internal.compat import BINARY
class RouterMessageLogTestBase(TestCase):
def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None):
p = self.popen(
['qdmanage'] +
cmd.split(' ') +
['--bus',
address or self.address(),
'--indent=-1', '--timeout', str(TIMEOUT)],
stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
universal_newlines=True)
out = p.communicate(input)[0]
try:
p.teardown()
except Exception as e:
raise Exception("%s\n%s" % (e, out))
return out
class RouterMessageLogTestAll(RouterMessageLogTestBase):
"""System tests to check log messages emitted by router"""
@classmethod
def setUpClass(cls):
"""Start a router and a messenger"""
super(RouterMessageLogTestAll, cls).setUpClass()
name = "test-router"
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
('listener', {'port': cls.tester.get_port(), 'messageLoggingComponents': 'all'}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'spread', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
])
cls.router = cls.tester.qdrouterd(name, config)
cls.router.wait_ready()
def address(self):
return self.router.addresses[0]
def test_log_message_all(self):
test = LogMessageTest(self.address())
test.run()
self.assertTrue(test.message_received)
everything_ok = False
logs = json.loads(self.run_qdmanage("get-log"))
message_logs = [log for log in logs if log[0] == u'MESSAGE']
self.assertTrue(message_logs)
test_message = [log for log in message_logs if "message-id=\"123455\"" in log[2]]
self.assertTrue(2 == len(test_message), message_logs) # Sent and Received
self.assertIn('Received', test_message[0][2])
self.assertIn('Sent', test_message[1][2])
for log in test_message:
self.assertIn('user-id=b"testuser"', log[2])
self.assertIn('subject="test-subject"', log[2])
self.assertIn('reply-to="hello_world"', log[2])
self.assertIn('correlation-id=89', log[2])
self.assertIn('content-type=:"text/html; charset=utf-8"', log[2])
self.assertIn('content-encoding=:"gzip, deflate"', log[2])
self.assertIn('group-id="group1", group-sequence=0, reply-to-group-id="group0"', log[2])
self.assertIn('app-properties={"app-property"=[10, 20, 30], "some-other"=:"O_one"}', log[2])
self.assertIn('creation-time="2017-02-22', log[2])
self.assertIn('10:23.883', log[2])
class RouterMessageLogTestNone(RouterMessageLogTestBase):
"""System tests to check log messages emitted by router"""
@classmethod
def setUpClass(cls):
"""Start a router and a messenger"""
super(RouterMessageLogTestNone, cls).setUpClass()
name = "test-router"
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
('listener', {'port': cls.tester.get_port()}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'spread', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
])
cls.router = cls.tester.qdrouterd(name, config)
cls.router.wait_ready()
def address(self):
return self.router.addresses[0]
def test_log_message_none(self):
test = LogMessageTest(self.address())
test.run()
self.assertTrue(test.message_received)
everything_ok = True
logs = json.loads(self.run_qdmanage("get-log"))
for log in logs:
if log[0] == u'MESSAGE':
everything_ok = False
self.assertTrue(everything_ok)
class RouterMessageLogTestSome(RouterMessageLogTestBase):
"""System tests to check log messages emitted by router"""
@classmethod
def setUpClass(cls):
"""Start a router and a messenger"""
super(RouterMessageLogTestSome, cls).setUpClass()
name = "test-router"
config = Qdrouterd.Config([
('router', {'mode': 'standalone', 'id': 'QDR'}),
# logMessage has been deprecated. We are using it here so we can make sure that it is still
# backward compatible.
('listener', {'port': cls.tester.get_port(), 'logMessage': 'user-id,subject,reply-to'}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'spread', 'distribution': 'balanced'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
])
cls.router = cls.tester.qdrouterd(name, config)
cls.router.wait_ready()
def address(self):
return self.router.addresses[0]
def test_log_message_some(self):
test = LogMessageTest(self.address())
test.run()
self.assertTrue(test.message_received)
everything_ok = False
logs = json.loads(self.run_qdmanage("get-log"))
message_logs = [log for log in logs if log[0] == u'MESSAGE']
self.assertTrue(message_logs)
test_message = [log for log in message_logs if
u'Received Message{user-id=b"testuser", subject="test-subject", reply-to="hello_world"}' in log[2]]
self.assertTrue(test_message, message_logs)
test_message = [log for log in message_logs if
u'Sent Message{user-id=b"testuser", subject="test-subject", reply-to="hello_world"}' in log[2]]
self.assertTrue(test_message, message_logs)
class LogMessageTest(MessagingHandler):
def __init__(self, address):
super(LogMessageTest, self).__init__(auto_accept=False)
self.address = address
self.sent = False
self.message_received = False
self.dest = "logMessageTest"
def on_start(self, event):
conn = event.container.connect(self.address)
event.container.create_sender(conn, self.dest)
event.container.create_receiver(conn, self.dest)
def on_sendable(self, event):
if not self.sent:
msg = Message()
msg.address = self.address
msg.id = '123455'
msg.user_id = BINARY('testuser')
msg.subject = 'test-subject'
msg.content_type = 'text/html; charset=utf-8'
msg.correlation_id = 89
msg.creation_time = 1487772623.883
msg.group_id = "group1"
msg.reply_to = 'hello_world'
msg.content_encoding = 'gzip, deflate'
msg.reply_to_group_id = "group0"
application_properties = dict()
application_properties['app-property'] = [10, 20, 30]
application_properties['some-other'] = symbol("O_one")
msg.properties = application_properties
msg.body = u"Hello World!"
event.sender.send(msg)
self.sent = True
def on_message(self, event):
if "Hello World!" == event.message.body:
self.message_received = True
event.connection.close()
def run(self):
Container(self).run()
if __name__ == '__main__':
unittest.main()