blob: fc29607624bb7eb9cf9e40fd9389d045a6740a38 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys, os
from logging import getLogger
from unittest import TestCase
from qpid.selector import Selector
from qpid.messaging import *
from qpid.messaging.exceptions import InternalError
class SelectorTests(TestCase):
"""Make sure that using a connection after a selector stops raises and doesn't hang"""
def setUp(self):
self.log = getLogger("qpid.messaging")
self.propagate = self.log.propagate
self.log.propagate = False # Disable for tests, expected log output is noisy
def tearDown(self):
# Clear out any broken selector so next test can function
Selector.DEFAULT = None
self.log.propagate = self.propagate # Restore setting
def configure(self, config):
self.broker = config.broker
def test_use_after_stop(self):
"""Create endpoints, stop the selector, try to use them"""
c = Connection.establish(self.broker)
ssn = c.session()
r = ssn.receiver("foo;{create:always,delete:always}")
s = ssn.sender("foo;{create:always,delete:always}")
Selector.DEFAULT.stop()
self.assertRaises(InternalError, c.session)
self.assertRaises(InternalError, ssn.sender, "foo")
self.assertRaises(InternalError, s.send, "foo")
self.assertRaises(InternalError, r.fetch)
self.assertRaises(InternalError, Connection.establish, self.broker)
def test_use_after_fork(self):
c = Connection.establish(self.broker)
pid = os.fork()
if pid: # Parent
self.assertEqual((pid, 0), os.waitpid(pid, 0))
self.assertEqual("child", c.session().receiver("child;{create:always}").fetch().content)
else: # Child
try:
# Can establish new connections
s = Connection.establish(self.broker).session().sender("child;{create:always}")
self.assertRaises(InternalError, c.session) # But can't use parent connection
s.send("child")
os._exit(0)
except Exception, e:
print >>sys.stderr, "test child process error: %s" % e
os.exit(1)
finally:
os._exit(1) # Hard exit from child to stop remaining tests running twice