Merge pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

diff --git a/sdks/python/apache_beam/utils/thread_pool_executor.py b/sdks/python/apache_beam/utils/thread_pool_executor.py
index 55b4015..a5443ed 100644
--- a/sdks/python/apache_beam/utils/thread_pool_executor.py
+++ b/sdks/python/apache_beam/utils/thread_pool_executor.py
@@ -55,13 +55,11 @@
 
 
 class _Worker(threading.Thread):
-  def __init__(
-      self, idle_worker_queue, permitted_thread_age_in_seconds, work_item):
+  def __init__(self, idle_worker_queue, work_item):
     super(_Worker, self).__init__()
     self._idle_worker_queue = idle_worker_queue
-    self._permitted_thread_age_in_seconds = permitted_thread_age_in_seconds
     self._work_item = work_item
-    self._wake_event = threading.Event()
+    self._wake_semaphore = threading.Semaphore(0)
     self._lock = threading.Lock()
     self._shutdown = False
 
@@ -70,62 +68,28 @@
       self._work_item.run()
       self._work_item = None
 
-      # If we are explicitly awake then don't add ourselves back to the
-      # idle queue. This occurs in case 3 described below.
-      if not self._wake_event.is_set():
-        self._idle_worker_queue.put(self)
+      self._idle_worker_queue.put(self)
+      self._wake_semaphore.acquire()
+      if self._work_item is None:
+        return
 
-      self._wake_event.wait(self._permitted_thread_age_in_seconds)
-      with self._lock:
-        # When we are awoken, we may be in one of three states:
-        #  1) _work_item is set and _shutdown is False.
-        #     This represents the case when we have accepted work.
-        #  2) _work_item is unset and _shutdown is True.
-        #     This represents the case where either we timed out before
-        #     accepting work or explicitly were shutdown without accepting
-        #     any work.
-        #  3) _work_item is set and _shutdown is True.
-        #     This represents a race where we accepted work and also
-        #     were shutdown before the worker thread started processing
-        #     that work. In this case we guarantee to process the work
-        #     but we don't clear the event ensuring that the next loop
-        #     around through to the wait() won't block and we will exit
-        #     since _work_item will be unset.
-
-        # We only exit when _work_item is unset to prevent dropping of
-        # submitted work.
-        if self._work_item is None:
-          self._shutdown = True
-          return
-        if not self._shutdown:
-          self._wake_event.clear()
-
-  def accepted_work(self, work_item):
-    """Returns True if the work was accepted.
+  def assign_work(self, work_item):
+    """Assigns the work item and wakes up the thread.
 
     This method must only be called while the worker is idle.
     """
-    with self._lock:
-      if self._shutdown:
-        return False
-
-      self._work_item = work_item
-      self._wake_event.set()
-      return True
+    self._work_item = work_item
+    self._wake_semaphore.release()
 
   def shutdown(self):
-    """Marks this thread as shutdown possibly waking it up if it is idle."""
-    with self._lock:
-      if self._shutdown:
-        return
-      self._shutdown = True
-      self._wake_event.set()
+    """Wakes up this thread with a 'None' work item signalling to shutdown."""
+    self._wake_semaphore.release()
 
 
 class UnboundedThreadPoolExecutor(_base.Executor):
-  def __init__(self, permitted_thread_age_in_seconds=30):
-    self._permitted_thread_age_in_seconds = permitted_thread_age_in_seconds
+  def __init__(self):
     self._idle_worker_queue = queue.Queue()
+    self._max_idle_threads = 16
     self._workers = weakref.WeakSet()
     self._shutdown = False
     self._lock = threading.Lock()  # Guards access to _workers and _shutdown
@@ -137,28 +101,25 @@
     """
     future = _base.Future()
     work_item = _WorkItem(future, fn, args, kwargs)
-    try:
-      # Keep trying to get an idle worker from the queue until we find one
-      # that accepts the work.
-      while not self._idle_worker_queue.get(
-          block=False).accepted_work(work_item):
-        pass
-      return future
-    except queue.Empty:
-      with self._lock:
-        if self._shutdown:
-          raise RuntimeError(
-              'Cannot schedule new tasks after thread pool '
-              'has been shutdown.')
+    with self._lock:
+      if self._shutdown:
+        raise RuntimeError(
+            'Cannot schedule new tasks after thread pool has been shutdown.')
+      try:
+        self._idle_worker_queue.get(block=False).assign_work(work_item)
 
-        worker = _Worker(
-            self._idle_worker_queue,
-            self._permitted_thread_age_in_seconds,
-            work_item)
+        # If we have more idle threads then the max allowed, shutdown a thread.
+        if self._idle_worker_queue.qsize() > self._max_idle_threads:
+          try:
+            self._idle_worker_queue.get(block=False).shutdown()
+          except queue.Empty:
+            pass
+      except queue.Empty:
+        worker = _Worker(self._idle_worker_queue, work_item)
         worker.daemon = True
         worker.start()
         self._workers.add(worker)
-        return future
+    return future
 
   def shutdown(self, wait=True):
     with self._lock:
@@ -166,6 +127,7 @@
         return
 
       self._shutdown = True
+
       for worker in self._workers:
         worker.shutdown()