Merge pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance
diff --git a/sdks/python/apache_beam/utils/thread_pool_executor.py b/sdks/python/apache_beam/utils/thread_pool_executor.py
index 55b4015..a5443ed 100644
--- a/sdks/python/apache_beam/utils/thread_pool_executor.py
+++ b/sdks/python/apache_beam/utils/thread_pool_executor.py
@@ -55,13 +55,11 @@
class _Worker(threading.Thread):
- def __init__(
- self, idle_worker_queue, permitted_thread_age_in_seconds, work_item):
+ def __init__(self, idle_worker_queue, work_item):
super(_Worker, self).__init__()
self._idle_worker_queue = idle_worker_queue
- self._permitted_thread_age_in_seconds = permitted_thread_age_in_seconds
self._work_item = work_item
- self._wake_event = threading.Event()
+ self._wake_semaphore = threading.Semaphore(0)
self._lock = threading.Lock()
self._shutdown = False
@@ -70,62 +68,28 @@
self._work_item.run()
self._work_item = None
- # If we are explicitly awake then don't add ourselves back to the
- # idle queue. This occurs in case 3 described below.
- if not self._wake_event.is_set():
- self._idle_worker_queue.put(self)
+ self._idle_worker_queue.put(self)
+ self._wake_semaphore.acquire()
+ if self._work_item is None:
+ return
- self._wake_event.wait(self._permitted_thread_age_in_seconds)
- with self._lock:
- # When we are awoken, we may be in one of three states:
- # 1) _work_item is set and _shutdown is False.
- # This represents the case when we have accepted work.
- # 2) _work_item is unset and _shutdown is True.
- # This represents the case where either we timed out before
- # accepting work or explicitly were shutdown without accepting
- # any work.
- # 3) _work_item is set and _shutdown is True.
- # This represents a race where we accepted work and also
- # were shutdown before the worker thread started processing
- # that work. In this case we guarantee to process the work
- # but we don't clear the event ensuring that the next loop
- # around through to the wait() won't block and we will exit
- # since _work_item will be unset.
-
- # We only exit when _work_item is unset to prevent dropping of
- # submitted work.
- if self._work_item is None:
- self._shutdown = True
- return
- if not self._shutdown:
- self._wake_event.clear()
-
- def accepted_work(self, work_item):
- """Returns True if the work was accepted.
+ def assign_work(self, work_item):
+ """Assigns the work item and wakes up the thread.
This method must only be called while the worker is idle.
"""
- with self._lock:
- if self._shutdown:
- return False
-
- self._work_item = work_item
- self._wake_event.set()
- return True
+ self._work_item = work_item
+ self._wake_semaphore.release()
def shutdown(self):
- """Marks this thread as shutdown possibly waking it up if it is idle."""
- with self._lock:
- if self._shutdown:
- return
- self._shutdown = True
- self._wake_event.set()
+ """Wakes up this thread with a 'None' work item signalling to shutdown."""
+ self._wake_semaphore.release()
class UnboundedThreadPoolExecutor(_base.Executor):
- def __init__(self, permitted_thread_age_in_seconds=30):
- self._permitted_thread_age_in_seconds = permitted_thread_age_in_seconds
+ def __init__(self):
self._idle_worker_queue = queue.Queue()
+ self._max_idle_threads = 16
self._workers = weakref.WeakSet()
self._shutdown = False
self._lock = threading.Lock() # Guards access to _workers and _shutdown
@@ -137,28 +101,25 @@
"""
future = _base.Future()
work_item = _WorkItem(future, fn, args, kwargs)
- try:
- # Keep trying to get an idle worker from the queue until we find one
- # that accepts the work.
- while not self._idle_worker_queue.get(
- block=False).accepted_work(work_item):
- pass
- return future
- except queue.Empty:
- with self._lock:
- if self._shutdown:
- raise RuntimeError(
- 'Cannot schedule new tasks after thread pool '
- 'has been shutdown.')
+ with self._lock:
+ if self._shutdown:
+ raise RuntimeError(
+ 'Cannot schedule new tasks after thread pool has been shutdown.')
+ try:
+ self._idle_worker_queue.get(block=False).assign_work(work_item)
- worker = _Worker(
- self._idle_worker_queue,
- self._permitted_thread_age_in_seconds,
- work_item)
+ # If we have more idle threads then the max allowed, shutdown a thread.
+ if self._idle_worker_queue.qsize() > self._max_idle_threads:
+ try:
+ self._idle_worker_queue.get(block=False).shutdown()
+ except queue.Empty:
+ pass
+ except queue.Empty:
+ worker = _Worker(self._idle_worker_queue, work_item)
worker.daemon = True
worker.start()
self._workers.add(worker)
- return future
+ return future
def shutdown(self, wait=True):
with self._lock:
@@ -166,6 +127,7 @@
return
self._shutdown = True
+
for worker in self._workers:
worker.shutdown()