DISPATCH-2324 - Coordinate port assignment between all parallel tests spawned by ctest -j (#123) (#1567)

diff --git a/tests/system_test.py b/tests/system_test.py
index 8bf3dd0..b63b3f8 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -29,6 +29,7 @@
 """
 
 import errno
+import fcntl
 import json
 import logging
 import os
@@ -747,6 +748,9 @@
     # The root directory for Tester directories, under top_dir
     root_dir = os.path.abspath(__name__ + '.dir')
 
+    # Minimum and maximum port number for free port searches
+    port_range = (20000, 30000)
+
     def __init__(self, id):
         """
         @param id: module.class.method or False if no directory should be created
@@ -754,6 +758,9 @@
         self.directory = os.path.join(self.root_dir, *id.split('.')) if id else None
         self.cleanup_list = []
 
+        self.port_file = pathlib.Path(self.top_dir, "next_port.lock").open("a+t")
+        self.cleanup(self.port_file)
+
     def rmtree(self):
         """Remove old test class results directory"""
         if self.directory:
@@ -765,6 +772,46 @@
             os.makedirs(self.directory)
             os.chdir(self.directory)
 
+    def _next_port(self) -> int:
+        """Reads and increments value stored in self.port_file, under an exclusive file lock.
+
+        When a lock cannot be acquired immediately, fcntl.lockf blocks.
+
+        Failure possibilities:
+            File locks may not work correctly on network filesystems. We still should be no worse off than we were.
+
+            This method always unlocks the lock file, so it should not ever deadlock other tests running in parallel.
+            Even if that happened, the lock is unlocked by the OS when the file is closed, which happens automatically
+            when the process that opened and locked it ends.
+
+            Invalid content in the self.port_file will break this method. Manual intervention is then required.
+        """
+        try:
+            fcntl.flock(self.port_file, fcntl.LOCK_EX)
+
+            # read old value
+            self.port_file.seek(0, os.SEEK_END)
+            if self.port_file.tell() != 0:
+                self.port_file.seek(0)
+                port = int(self.port_file.read())
+            else:
+                # file is empty
+                port = random.randint(self.port_range[0], self.port_range[1])
+
+            next_port = port + 1
+            if next_port >= self.port_range[1]:
+                next_port = self.port_range[0]
+
+            # write new value
+            self.port_file.seek(0)
+            self.port_file.truncate(0)
+            self.port_file.write(str(next_port))
+            self.port_file.flush()
+
+            return port
+        finally:
+            fcntl.flock(self.port_file, fcntl.LOCK_UN)
+
     def teardown(self):
         """Clean up (tear-down, stop or close) objects recorded via cleanup()"""
         self.cleanup_list.reverse()
@@ -798,27 +845,22 @@
     port_range = (20000, 30000)
     next_port = random.randint(port_range[0], port_range[1])
 
-    @classmethod
-    def get_port(cls, protocol_family='IPv4'):
+    def get_port(self, protocol_family='IPv4'):
         """Get an unused port"""
-        def advance():
-            """Advance with wrap-around"""
-            cls.next_port += 1
-            if cls.next_port >= cls.port_range[1]:
-                cls.next_port = cls.port_range[0]
-        start = cls.next_port
-        while not port_available(cls.next_port, protocol_family):
-            advance()
-            if cls.next_port == start:
-                raise Exception("No available ports in range %s", cls.port_range)
-        p = cls.next_port
-        advance()
+        p = self._next_port()
+        start = p
+        while not port_available(p, protocol_family):
+            p = self._next_port()
+            if p == start:
+                raise Exception("No available ports in range %s", self.port_range)
         return p
 
 
 class TestCase(unittest.TestCase, Tester):  # pylint: disable=too-many-public-methods
     """A TestCase that sets up its own working directory and is also a Tester."""
 
+    tester: Tester
+
     def __init__(self, test_method):
         unittest.TestCase.__init__(self, test_method)
         Tester.__init__(self, self.id())
diff --git a/tests/system_tests_broker.py b/tests/system_tests_broker.py
index a1603c0..954f5ea 100644
--- a/tests/system_tests_broker.py
+++ b/tests/system_tests_broker.py
@@ -40,7 +40,7 @@
         def setUpClass(cls):
             """Start 3 qpidd brokers, wait for them to be ready."""
             super(DistributedQueueTest, cls).setUpClass()
-            cls.qpidds = [cls.tester.qpidd('qpidd%s' % i, port=cls.get_port(), wait=False)
+            cls.qpidds = [cls.tester.qpidd('qpidd%s' % i, port=cls.tester.get_port(), wait=False)
                           for i in range(3)]
             for q in cls.qpidds:
                 q.wait_ready()
diff --git a/tests/system_tests_management.py b/tests/system_tests_management.py
index 7d03741..2ad4f8b 100644
--- a/tests/system_tests_management.py
+++ b/tests/system_tests_management.py
@@ -66,28 +66,28 @@
         # Stand-alone router
         conf0 = Qdrouterd.Config([
             ('router', {'mode': 'standalone', 'id': 'solo', 'metadata': 'selftest;solo'}),
-            ('listener', {'name': 'l0', 'port': cls.get_port(), 'role': 'normal'}),
+            ('listener', {'name': 'l0', 'port': cls.tester.get_port(), 'role': 'normal'}),
             # Extra listeners to exercise managment query
-            ('listener', {'name': 'l1', 'port': cls.get_port(), 'role': 'normal'}),
-            ('listener', {'name': 'l2', 'port': cls.get_port(), 'role': 'normal'})
+            ('listener', {'name': 'l1', 'port': cls.tester.get_port(), 'role': 'normal'}),
+            ('listener', {'name': 'l2', 'port': cls.tester.get_port(), 'role': 'normal'})
         ])
         cls._router = cls.tester.qdrouterd(config=conf0, wait=False)
 
         # Trio of interior routers linked in a line so we can see some next-hop values.
         conf0 = Qdrouterd.Config([
             ('router', {'mode': 'interior', 'id': 'router0'}),
-            ('listener', {'port': cls.get_port(), 'role': 'normal'}),
-            ('listener', {'port': cls.get_port(), 'role': 'inter-router'})
+            ('listener', {'port': cls.tester.get_port(), 'role': 'normal'}),
+            ('listener', {'port': cls.tester.get_port(), 'role': 'inter-router'})
         ])
         conf1 = Qdrouterd.Config([
             ('router', {'mode': 'interior', 'id': 'router1'}),
-            ('listener', {'port': cls.get_port(), 'role': 'normal'}),
+            ('listener', {'port': cls.tester.get_port(), 'role': 'normal'}),
             ('connector', {'port': conf0.sections('listener')[1]['port'], 'role':'inter-router'}),
-            ('listener', {'port': cls.get_port(), 'role': 'inter-router'})
+            ('listener', {'port': cls.tester.get_port(), 'role': 'inter-router'})
         ])
         conf2 = Qdrouterd.Config([
             ('router', {'mode': 'interior', 'id': 'router2'}),
-            ('listener', {'port': cls.get_port(), 'role': 'normal'}),
+            ('listener', {'port': cls.tester.get_port(), 'role': 'normal'}),
             ('connector', {'port': conf1.sections('listener')[1]['port'], 'role':'inter-router'})
         ])
         cls._routers = [cls.tester.qdrouterd(config=c, wait=False) for c in [conf0, conf1, conf2]]
@@ -95,7 +95,7 @@
         # Standalone router for logging tests (avoid interfering with logging for other tests.)
         conflog = Qdrouterd.Config([
             ('router', {'mode': 'standalone', 'id': 'logrouter'}),
-            ('listener', {'port': cls.get_port(), 'role': 'normal'}),
+            ('listener', {'port': cls.tester.get_port(), 'role': 'normal'}),
         ])
         cls._logrouter = cls.tester.qdrouterd(config=conflog, wait=False)