blob: aba8f5ad0090e0aa84040c19a3b45285bd7625a5 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
import sys
import threading
import weakref
from concurrent.futures import _base
try: # Python3
import queue
except Exception: # Python2
import Queue as queue
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self._future = future
self._fn = fn
self._fn_args = args
self._fn_kwargs = kwargs
def run(self):
if self._future.set_running_or_notify_cancel():
# If the future wasn't cancelled, then attempt to execute it.
try:
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
except BaseException as exc:
# Even though Python 2 futures library has #set_exection(),
# the way it generates the traceback doesn't align with
# the way in which Python 3 does it so we provide alternative
# implementations that match our test expectations.
if sys.version_info.major >= 3:
self._future.set_exception(exc)
else:
e, tb = sys.exc_info()[1:]
self._future.set_exception_info(e, tb)
class _Worker(threading.Thread):
def __init__(self, idle_worker_queue, permitted_thread_age_in_seconds,
work_item):
super(_Worker, self).__init__()
self._idle_worker_queue = idle_worker_queue
self._permitted_thread_age_in_seconds = permitted_thread_age_in_seconds
self._work_item = work_item
self._wake_event = threading.Event()
self._lock = threading.Lock()
self._shutdown = False
def run(self):
while True:
self._work_item.run()
self._work_item = None
# If we are explicitly awake then don't add ourselves back to the
# idle queue. This occurs in case 3 described below.
if not self._wake_event.is_set():
self._idle_worker_queue.put(self)
self._wake_event.wait(self._permitted_thread_age_in_seconds)
with self._lock:
# When we are awoken, we may be in one of three states:
# 1) _work_item is set and _shutdown is False.
# This represents the case when we have accepted work.
# 2) _work_item is unset and _shutdown is True.
# This represents the case where either we timed out before
# accepting work or explicitly were shutdown without accepting
# any work.
# 3) _work_item is set and _shutdown is True.
# This represents a race where we accepted work and also
# were shutdown before the worker thread started processing
# that work. In this case we guarantee to process the work
# but we don't clear the event ensuring that the next loop
# around through to the wait() won't block and we will exit
# since _work_item will be unset.
# We only exit when _work_item is unset to prevent dropping of
# submitted work.
if self._work_item is None:
self._shutdown = True
return
if not self._shutdown:
self._wake_event.clear()
def accepted_work(self, work_item):
"""Returns True if the work was accepted.
This method must only be called while the worker is idle.
"""
with self._lock:
if self._shutdown:
return False
self._work_item = work_item
self._wake_event.set()
return True
def shutdown(self):
"""Marks this thread as shutdown possibly waking it up if it is idle."""
with self._lock:
if self._shutdown:
return
self._shutdown = True
self._wake_event.set()
class UnboundedThreadPoolExecutor(_base.Executor):
def __init__(self, permitted_thread_age_in_seconds=30):
self._permitted_thread_age_in_seconds = permitted_thread_age_in_seconds
self._idle_worker_queue = queue.Queue()
self._workers = weakref.WeakSet()
self._shutdown = False
self._lock = threading.Lock() # Guards access to _workers and _shutdown
def submit(self, fn, *args, **kwargs):
"""Attempts to submit the work item.
A runtime error is raised if the pool has been shutdown.
"""
future = _base.Future()
work_item = _WorkItem(future, fn, args, kwargs)
try:
# Keep trying to get an idle worker from the queue until we find one
# that accepts the work.
while not self._idle_worker_queue.get(
block=False).accepted_work(work_item):
pass
return future
except queue.Empty:
with self._lock:
if self._shutdown:
raise RuntimeError('Cannot schedule new tasks after thread pool '
'has been shutdown.')
worker = _Worker(
self._idle_worker_queue, self._permitted_thread_age_in_seconds,
work_item)
worker.daemon = True
worker.start()
self._workers.add(worker)
return future
def shutdown(self, wait=True):
with self._lock:
if self._shutdown:
return
self._shutdown = True
for worker in self._workers:
worker.shutdown()
if wait:
for worker in self._workers:
worker.join()