blob: b9d627a2d1754ed7fea0238ca67e1808ff7b585b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Threading utilities.
"""
from __future__ import absolute_import # so we can import standard 'threading'
import sys
import itertools
import multiprocessing
from threading import (Thread, Lock)
from Queue import (Queue, Full, Empty)
from .exceptions import print_exception
class ExecutorException(Exception):
pass
class DaemonThread(Thread):
def __init__(self, *args, **kwargs):
super(DaemonThread, self).__init__(*args, **kwargs)
self.daemon = True
def run(self):
"""
We're overriding ``Thread.run`` in order to avoid annoying (but harmless) error messages
during shutdown. The problem is that CPython nullifies the global state _before_ shutting
down daemon threads, so that exceptions might happen, and then ``Thread.__bootstrap_inner``
prints them out.
Our solution is to swallow these exceptions here.
The side effect is that uncaught exceptions in our own thread code will _not_ be printed out
as usual, so it's our responsibility to catch them in our code.
"""
try:
super(DaemonThread, self).run()
except SystemExit as e:
# This exception should be bubbled up
raise e
except BaseException:
# Exceptions might occur in daemon threads during interpreter shutdown
pass
# https://gist.github.com/tliron/81dd915166b0bfc64be08b4f8e22c835
class FixedThreadPoolExecutor(object):
"""
Executes tasks in a fixed thread pool.
Makes sure to gather all returned results and thrown exceptions in one place, in order of task
submission.
Example::
def sum(arg1, arg2):
return arg1 + arg2
executor = FixedThreadPoolExecutor(10)
try:
for value in range(100):
executor.submit(sum, value, value)
executor.drain()
except:
executor.close()
executor.raise_first()
print executor.returns
You can also use it with the Python ``with`` keyword, in which case you don't need to call
``close`` explicitly::
with FixedThreadPoolExecutor(10) as executor:
for value in range(100):
executor.submit(sum, value, value)
executor.drain()
executor.raise_first()
print executor.returns
"""
_CYANIDE = object() # Special task marker used to kill worker threads.
def __init__(self,
size=None,
timeout=None,
print_exceptions=False):
"""
:param size: number of threads in the pool; if ``None`` will use an optimal number for the
platform
:param timeout: timeout in seconds for all blocking operations (``None`` means no timeout)
:param print_exceptions: set to ``True`` in order to print exceptions from tasks
"""
if not size:
try:
size = multiprocessing.cpu_count() * 2 + 1
except NotImplementedError:
size = 3
self.size = size
self.timeout = timeout
self.print_exceptions = print_exceptions
self._tasks = Queue()
self._returns = {}
self._exceptions = {}
self._id_creator = itertools.count()
self._lock = Lock() # for console output
self._workers = []
for index in range(size):
worker = DaemonThread(
name='%s%d' % (self.__class__.__name__, index),
target=self._thread_worker)
worker.start()
self._workers.append(worker)
def submit(self, func, *args, **kwargs):
"""
Submit a task for execution.
The task will be called ASAP on the next available worker thread in the pool.
:raises ExecutorException: if cannot be submitted
"""
try:
self._tasks.put((self._id_creator.next(), func, args, kwargs), timeout=self.timeout)
except Full:
raise ExecutorException('cannot submit task: queue is full')
def close(self):
"""
Blocks until all current tasks finish execution and all worker threads are dead.
You cannot submit tasks anymore after calling this.
This is called automatically upon exit if you are using the ``with`` keyword.
"""
self.drain()
while self.is_alive:
try:
self._tasks.put(self._CYANIDE, timeout=self.timeout)
except Full:
raise ExecutorException('cannot close executor: a thread seems to be hanging')
self._workers = None
def drain(self):
"""
Blocks until all current tasks finish execution, but leaves the worker threads alive.
"""
self._tasks.join() # oddly, the API does not support a timeout parameter
@property
def is_alive(self):
"""
True if any of the worker threads are alive.
"""
for worker in self._workers:
if worker.is_alive():
return True
return False
@property
def returns(self):
"""
The returned values from all tasks, in order of submission.
"""
return [self._returns[k] for k in sorted(self._returns)]
@property
def exceptions(self):
"""
The raised exceptions from all tasks, in order of submission.
"""
return [self._exceptions[k] for k in sorted(self._exceptions)]
def raise_first(self):
"""
If exceptions were thrown by any task, then the first one will be raised.
This is rather arbitrary: proper handling would involve iterating all the exceptions.
However, if you want to use the "raise" mechanism, you are limited to raising only one of
them.
"""
exceptions = self.exceptions
if exceptions:
raise exceptions[0]
def _thread_worker(self):
while True:
if not self._execute_next_task():
break
def _execute_next_task(self):
try:
task = self._tasks.get(timeout=self.timeout)
except Empty:
# Happens if timeout is reached
return True
if task == self._CYANIDE:
# Time to die :(
return False
self._execute_task(*task)
return True
def _execute_task(self, task_id, func, args, kwargs):
try:
result = func(*args, **kwargs)
self._returns[task_id] = result
except Exception as e:
self._exceptions[task_id] = e
if self.print_exceptions:
with self._lock:
print_exception(e)
self._tasks.task_done()
def __enter__(self):
return self
def __exit__(self, the_type, value, traceback):
self.close()
return False
class LockedList(list):
"""
A list that supports the ``with`` keyword with a built-in lock.
Though Python lists are thread-safe in that they will not raise exceptions during concurrent
access, they do not guarantee atomicity. This class will let you gain atomicity when needed.
"""
def __init__(self, *args, **kwargs):
super(LockedList, self).__init__(*args, **kwargs)
self.lock = Lock()
def __enter__(self):
return self.lock.__enter__()
def __exit__(self, the_type, value, traceback):
return self.lock.__exit__(the_type, value, traceback)
class ExceptionThread(Thread):
"""
A thread from which top level exceptions can be retrieved or re-raised.
"""
def __init__(self, *args, **kwargs):
Thread.__init__(self, *args, **kwargs)
self.exception = None
def run(self):
try:
super(ExceptionThread, self).run()
except BaseException:
self.exception = sys.exc_info()
def is_error(self):
return self.exception is not None
def raise_error_if_exists(self):
if self.is_error():
type_, value, trace = self.exception
raise type_, value, trace