| """ |
| Jobs represent scheduled tasks. |
| """ |
| |
| from threading import Lock |
| from datetime import timedelta |
| |
| from apscheduler.util import to_unicode, ref_to_obj, get_callable_name,\ |
| obj_to_ref |
| |
| |
| class MaxInstancesReachedError(Exception): |
| pass |
| |
| |
| class Job(object): |
| """ |
| Encapsulates the actual Job along with its metadata. Job instances |
| are created by the scheduler when adding jobs, and should not be |
| directly instantiated. These options can be set when adding jobs |
| to the scheduler (see :ref:`job_options`). |
| |
| :var trigger: trigger that determines the execution times |
| :var func: callable to call when the trigger is triggered |
| :var args: list of positional arguments to call func with |
| :var kwargs: dict of keyword arguments to call func with |
| :var name: name of the job |
| :var misfire_grace_time: seconds after the designated run time that |
| the job is still allowed to be run |
| :var coalesce: run once instead of many times if the scheduler determines |
| that the job should be run more than once in succession |
| :var max_runs: maximum number of times this job is allowed to be |
| triggered |
| :var max_instances: maximum number of concurrently running |
| instances allowed for this job |
| :var runs: number of times this job has been triggered |
| :var instances: number of concurrently running instances of this job |
| """ |
| id = None |
| next_run_time = None |
| |
| def __init__(self, trigger, func, args, kwargs, misfire_grace_time, |
| coalesce, name=None, max_runs=None, max_instances=1): |
| if not trigger: |
| raise ValueError('The trigger must not be None') |
| if not hasattr(func, '__call__'): |
| raise TypeError('func must be callable') |
| if not hasattr(args, '__getitem__'): |
| raise TypeError('args must be a list-like object') |
| if not hasattr(kwargs, '__getitem__'): |
| raise TypeError('kwargs must be a dict-like object') |
| if misfire_grace_time <= 0: |
| raise ValueError('misfire_grace_time must be a positive value') |
| if max_runs is not None and max_runs <= 0: |
| raise ValueError('max_runs must be a positive value') |
| if max_instances <= 0: |
| raise ValueError('max_instances must be a positive value') |
| |
| self._lock = Lock() |
| |
| self.trigger = trigger |
| self.func = func |
| self.args = args |
| self.kwargs = kwargs |
| self.name = to_unicode(name or get_callable_name(func)) |
| self.misfire_grace_time = misfire_grace_time |
| self.coalesce = coalesce |
| self.max_runs = max_runs |
| self.max_instances = max_instances |
| self.runs = 0 |
| self.instances = 0 |
| |
| def compute_next_run_time(self, now): |
| if self.runs == self.max_runs: |
| self.next_run_time = None |
| else: |
| self.next_run_time = self.trigger.get_next_fire_time(now) |
| |
| return self.next_run_time |
| |
| def get_run_times(self, now): |
| """ |
| Computes the scheduled run times between ``next_run_time`` and ``now``. |
| """ |
| run_times = [] |
| run_time = self.next_run_time |
| increment = timedelta(microseconds=1) |
| while ((not self.max_runs or self.runs < self.max_runs) and |
| run_time and run_time <= now): |
| run_times.append(run_time) |
| run_time = self.trigger.get_next_fire_time(run_time + increment) |
| |
| return run_times |
| |
| def add_instance(self): |
| self._lock.acquire() |
| try: |
| if self.instances == self.max_instances: |
| raise MaxInstancesReachedError |
| self.instances += 1 |
| finally: |
| self._lock.release() |
| |
| def remove_instance(self): |
| self._lock.acquire() |
| try: |
| assert self.instances > 0, 'Already at 0 instances' |
| self.instances -= 1 |
| finally: |
| self._lock.release() |
| |
| def __getstate__(self): |
| # Prevents the unwanted pickling of transient or unpicklable variables |
| state = self.__dict__.copy() |
| state.pop('instances', None) |
| state.pop('func', None) |
| state.pop('_lock', None) |
| state['func_ref'] = obj_to_ref(self.func) |
| return state |
| |
| def __setstate__(self, state): |
| state['instances'] = 0 |
| state['func'] = ref_to_obj(state.pop('func_ref')) |
| state['_lock'] = Lock() |
| self.__dict__ = state |
| |
| def __eq__(self, other): |
| if isinstance(other, Job): |
| return self.id is not None and other.id == self.id or self is other |
| return NotImplemented |
| |
| def __repr__(self): |
| return '<Job (name=%s, trigger=%s)>' % (self.name, repr(self.trigger)) |
| |
| def __str__(self): |
| return '%s (trigger: %s, next run at: %s)' % ( |
| self.name, str(self.trigger), str(self.next_run_time)) |