</pre><pre class="rust"><code><span class="doccomment">//! The task module.
//! The task module contains the code that manages spawned tasks and provides a
//! safe API for the rest of the runtime to use. Each task in a runtime is
//! stored in an OwnedTasks or LocalOwnedTasks object.
//! # Task reference types
//! A task is usually referenced by multiple handles, and there are several
//! types of handles.
//! * OwnedTask - tasks stored in an OwnedTasks or LocalOwnedTasks are of this
//! reference type.
//! * JoinHandle - each task has a JoinHandle that allows access to the output
//! of the task.
//! * Waker - every waker for a task has this reference type. There can be any
//! number of waker references.
//! * Notified - tracks whether the task is notified.
//! * Unowned - this task reference type is used for tasks not stored in any
//! runtime. Mainly used for blocking tasks, but also in tests.
//! The task uses a reference count to keep track of how many active references
//! exist. The Unowned reference type takes up two ref-counts. All other
//! reference types take up a single ref-count.
//! Besides the waker type, each task has at most one of each reference type.
//! # State
//! The task stores its state in an atomic usize with various bitfields for the
//! necessary information. The state has the following bitfields:
//! * RUNNING - Tracks whether the task is currently being polled or cancelled.
//! This bit functions as a lock around the task.
//! * COMPLETE - Is one once the future has fully completed and has been
//! dropped. Never unset once set. Never set together with RUNNING.
//! * NOTIFIED - Tracks whether a Notified object currently exists.
//! * CANCELLED - Is set to one for tasks that should be cancelled as soon as
//! possible. May take any value for completed tasks.
//! * JOIN_INTEREST - Is set to one if there exists a JoinHandle.
//! * JOIN_WAKER - Acts as an access control bit for the join handle waker. The
//! protocol for its usage is described below.
//! The rest of the bits are used for the ref-count.
//! # Fields in the task
//! The task has various fields. This section describes how and when it is safe
//! to access a field.
//! * The state field is accessed with atomic instructions.
//! * The OwnedTask reference has exclusive access to the `owned` field.
//! * The Notified reference has exclusive access to the `queue_next` field.
//! * The `owner_id` field can be set as part of construction of the task, but
//! is otherwise immutable and anyone can access the field immutably without
//! synchronization.
//! * If COMPLETE is one, then the JoinHandle has exclusive access to the
//! stage field. If COMPLETE is zero, then the RUNNING bitfield functions as
//! a lock for the stage field, and it can be accessed only by the thread
//! that set RUNNING to one.
//! * The waker field may be concurrently accessed by different threads: in one
//! thread the runtime may complete a task and *read* the waker field to
//! invoke the waker, and in another thread the task&#39;s JoinHandle may be
//! polled, and if the task hasn&#39;t yet completed, the JoinHandle may *write*
//! a waker to the waker field. The JOIN_WAKER bit ensures safe access by
//! multiple threads to the waker field using the following rules:
//! 1. JOIN_WAKER is initialized to zero.
//! 2. If JOIN_WAKER is zero, then the JoinHandle has exclusive (mutable)
//! access to the waker field.
//! 3. If JOIN_WAKER is one, then the JoinHandle has shared (read-only)
//! access to the waker field.
//! 4. If JOIN_WAKER is one and COMPLETE is one, then the runtime has shared
//! (read-only) access to the waker field.
//! 5. If the JoinHandle needs to write to the waker field, then the
//! JoinHandle needs to (i) successfully set JOIN_WAKER to zero if it is
//! not already zero to gain exclusive access to the waker field per rule
//! 2, (ii) write a waker, and (iii) successfully set JOIN_WAKER to one.
//! 6. The JoinHandle can change JOIN_WAKER only if COMPLETE is zero (i.e.
//! the task hasn&#39;t yet completed).
//! Rule 6 implies that the steps (i) or (iii) of rule 5 may fail due to a
//! race. If step (i) fails, then the attempt to write a waker is aborted. If
//! step (iii) fails because COMPLETE is set to one by another thread after
//! step (i), then the waker field is cleared. Once COMPLETE is one (i.e.
//! task has completed), the JoinHandle will not modify JOIN_WAKER. After the
//! runtime sets COMPLETE to one, it invokes the waker if there is one.
//! All other fields are immutable and can be accessed immutably without
//! synchronization by anyone.
//! # Safety
//! This section goes through various situations and explains why the API is
//! safe in that situation.
//! ## Polling or dropping the future
//! Any mutable access to the future happens after obtaining a lock by modifying
//! the RUNNING field, so exclusive access is ensured.
//! When the task completes, exclusive access to the output is transferred to
//! the JoinHandle. If the JoinHandle is already dropped when the transition to
//! complete happens, the thread performing that transition retains exclusive
//! access to the output and should immediately drop it.
//! ## Non-Send futures
//! If a future is not Send, then it is bound to a LocalOwnedTasks. The future
//! will only ever be polled or dropped given a LocalNotified or inside a call
//! to LocalOwnedTasks::shutdown_all. In either case, it is guaranteed that the
//! future is on the right thread.
//! If the task is never removed from the LocalOwnedTasks, then it is leaked, so
//! there is no risk that the task is dropped on some other thread when the last
//! ref-count drops.
//! ## Non-Send output
//! When a task completes, the output is placed in the stage of the task. Then,
//! a transition that sets COMPLETE to true is performed, and the value of
//! JOIN_INTEREST when this transition happens is read.
//! If JOIN_INTEREST is zero when the transition to COMPLETE happens, then the
//! output is immediately dropped.
//! If JOIN_INTEREST is one when the transition to COMPLETE happens, then the
//! JoinHandle is responsible for cleaning up the output. If the output is not
//! Send, then this happens:
//! 1. The output is created on the thread that the future was polled on. Since
//! only non-Send futures can have non-Send output, the future was polled on
//! the thread that the future was spawned from.
//! 2. Since `JoinHandle&lt;Output&gt;` is not Send if Output is not Send, the
//! JoinHandle is also on the thread that the future was spawned from.
//! 3. Thus, the JoinHandle will not move the output across threads when it
//! takes or drops the output.
//! ## Recursive poll/shutdown
//! Calling poll from inside a shutdown call or vice-versa is not prevented by
//! the API exposed by the task module, so this has to be safe. In either case,
//! the lock in the RUNNING bitfield makes the inner call return immediately. If
//! the inner call is a `shutdown` call, then the CANCELLED bit is set, and the
//! poll call will notice it when the poll finishes, and the task is cancelled
//! at that point.
</span><span class="comment">// Some task infrastructure is here to support `JoinSet`, which is currently
// unstable. This should be removed once `JoinSet` is stabilized.
</span><span class="attribute">#![cfg_attr(not(tokio_unstable), allow(dead_code))]
</span><span class="kw">mod </span>core;
<span class="kw">use </span><span class="self">self</span>::core::Cell;
<span class="kw">use </span><span class="self">self</span>::core::Header;
<span class="kw">mod </span>error;
<span class="kw">pub use </span><span class="self">self</span>::error::JoinError;
<span class="kw">mod </span>harness;
<span class="kw">use </span><span class="self">self</span>::harness::Harness;
<span class="kw">mod </span>id;
<span class="attribute">#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
</span><span class="kw">pub use </span>id::{id, try_id, Id};
<span class="macro">cfg_rt_multi_thread! </span>{
<span class="kw">mod </span>inject;
<span class="kw">pub</span>(<span class="kw">super</span>) <span class="kw">use </span><span class="self">self</span>::inject::Inject;
<span class="attribute">#[cfg(feature = <span class="string">&quot;rt&quot;</span>)]
</span><span class="kw">mod </span>abort;
<span class="kw">mod </span>join;
<span class="attribute">#[cfg(feature = <span class="string">&quot;rt&quot;</span>)]
</span><span class="kw">pub use </span><span class="self">self</span>::abort::AbortHandle;
<span class="kw">pub use </span><span class="self">self</span>::join::JoinHandle;
<span class="kw">mod </span>list;
<span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">use </span><span class="self">self</span>::list::{LocalOwnedTasks, OwnedTasks};
<span class="kw">mod </span>raw;
<span class="kw">use </span><span class="self">self</span>::raw::RawTask;
<span class="kw">mod </span>state;
<span class="kw">use </span><span class="self">self</span>::state::State;
<span class="kw">mod </span>waker;
<span class="kw">use </span><span class="kw">crate</span>::future::Future;
<span class="kw">use </span><span class="kw">crate</span>::util::linked_list;
<span class="kw">use </span>std::marker::PhantomData;
<span class="kw">use </span>std::ptr::NonNull;
<span class="kw">use </span>std::{fmt, mem};
<span class="doccomment">/// An owned handle to the task, tracked by ref count.
</span><span class="attribute">#[repr(transparent)]
</span><span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">struct </span>Task&lt;S: <span class="lifetime">&#39;static</span>&gt; {
raw: RawTask,
_p: PhantomData&lt;S&gt;,
<span class="kw">unsafe impl</span>&lt;S&gt; Send <span class="kw">for </span>Task&lt;S&gt; {}
<span class="kw">unsafe impl</span>&lt;S&gt; Sync <span class="kw">for </span>Task&lt;S&gt; {}
<span class="doccomment">/// A task was notified.
</span><span class="attribute">#[repr(transparent)]
</span><span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">struct </span>Notified&lt;S: <span class="lifetime">&#39;static</span>&gt;(Task&lt;S&gt;);
<span class="comment">// safety: This type cannot be used to touch the task without first verifying
// that the value is on a thread where it is safe to poll the task.
</span><span class="kw">unsafe impl</span>&lt;S: Schedule&gt; Send <span class="kw">for </span>Notified&lt;S&gt; {}
<span class="kw">unsafe impl</span>&lt;S: Schedule&gt; Sync <span class="kw">for </span>Notified&lt;S&gt; {}
<span class="doccomment">/// A non-Send variant of Notified with the invariant that it is on a thread
/// where it is safe to poll it.
</span><span class="attribute">#[repr(transparent)]
</span><span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">struct </span>LocalNotified&lt;S: <span class="lifetime">&#39;static</span>&gt; {
task: Task&lt;S&gt;,
_not_send: PhantomData&lt;<span class="kw-2">*const </span>()&gt;,
<span class="doccomment">/// A task that is not owned by any OwnedTasks. Used for blocking tasks.
/// This type holds two ref-counts.
</span><span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">struct </span>UnownedTask&lt;S: <span class="lifetime">&#39;static</span>&gt; {
raw: RawTask,
_p: PhantomData&lt;S&gt;,
<span class="comment">// safety: This type can only be created given a Send task.
</span><span class="kw">unsafe impl</span>&lt;S&gt; Send <span class="kw">for </span>UnownedTask&lt;S&gt; {}
<span class="kw">unsafe impl</span>&lt;S&gt; Sync <span class="kw">for </span>UnownedTask&lt;S&gt; {}
<span class="doccomment">/// Task result sent back.
</span><span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">type </span><span class="prelude-ty">Result</span>&lt;T&gt; = std::result::Result&lt;T, JoinError&gt;;
<span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">trait </span>Schedule: Sync + Sized + <span class="lifetime">&#39;static </span>{
<span class="doccomment">/// The task has completed work and is ready to be released. The scheduler
/// should release it immediately and return it. The task module will batch
/// the ref-dec with setting other options.
/// If the scheduler has already released the task, then None is returned.
</span><span class="kw">fn </span>release(<span class="kw-2">&amp;</span><span class="self">self</span>, task: <span class="kw-2">&amp;</span>Task&lt;<span class="self">Self</span>&gt;) -&gt; <span class="prelude-ty">Option</span>&lt;Task&lt;<span class="self">Self</span>&gt;&gt;;
<span class="doccomment">/// Schedule the task
</span><span class="kw">fn </span>schedule(<span class="kw-2">&amp;</span><span class="self">self</span>, task: Notified&lt;<span class="self">Self</span>&gt;);
<span class="doccomment">/// Schedule the task to run in the near future, yielding the thread to
/// other tasks.
</span><span class="kw">fn </span>yield_now(<span class="kw-2">&amp;</span><span class="self">self</span>, task: Notified&lt;<span class="self">Self</span>&gt;) {
<span class="self">self</span>.schedule(task);
<span class="doccomment">/// Polling the task resulted in a panic. Should the runtime shutdown?
</span><span class="kw">fn </span>unhandled_panic(<span class="kw-2">&amp;</span><span class="self">self</span>) {
<span class="comment">// By default, do nothing. This maintains the 1.0 behavior.
<span class="macro">cfg_rt! </span>{
<span class="doccomment">/// This is the constructor for a new task. Three references to the task are
/// created. The first task reference is usually put into an OwnedTasks
/// immediately. The Notified is sent to the scheduler as an ordinary
/// notification.
</span><span class="kw">fn </span>new_task&lt;T, S&gt;(
task: T,
scheduler: S,
id: Id,
) -&gt; (Task&lt;S&gt;, Notified&lt;S&gt;, JoinHandle&lt;T::Output&gt;)
<span class="kw">where
</span>S: Schedule,
T: Future + <span class="lifetime">&#39;static</span>,
T::Output: <span class="lifetime">&#39;static</span>,
<span class="kw">let </span>raw = RawTask::new::&lt;T, S&gt;(task, scheduler, id);
<span class="kw">let </span>task = Task {
_p: PhantomData,
<span class="kw">let </span>notified = Notified(Task {
_p: PhantomData,
<span class="kw">let </span>join = JoinHandle::new(raw);
(task, notified, join)
<span class="doccomment">/// Creates a new task with an associated join handle. This method is used
/// only when the task is not going to be stored in an `OwnedTasks` list.
/// Currently only blocking tasks use this method.
</span><span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">fn </span>unowned&lt;T, S&gt;(task: T, scheduler: S, id: Id) -&gt; (UnownedTask&lt;S&gt;, JoinHandle&lt;T::Output&gt;)
<span class="kw">where
</span>S: Schedule,
T: Send + Future + <span class="lifetime">&#39;static</span>,
T::Output: Send + <span class="lifetime">&#39;static</span>,
<span class="kw">let </span>(task, notified, join) = new_task(task, scheduler, id);
<span class="comment">// This transfers the ref-count of task and notified into an UnownedTask.
// This is valid because an UnownedTask holds two ref-counts.
</span><span class="kw">let </span>unowned = UnownedTask {
raw: task.raw,
_p: PhantomData,
(unowned, join)
<span class="kw">impl</span>&lt;S: <span class="lifetime">&#39;static</span>&gt; Task&lt;S&gt; {
<span class="kw">unsafe fn </span>from_raw(ptr: NonNull&lt;Header&gt;) -&gt; Task&lt;S&gt; {
Task {
raw: RawTask::from_raw(ptr),
_p: PhantomData,
<span class="kw">fn </span>header(<span class="kw-2">&amp;</span><span class="self">self</span>) -&gt; <span class="kw-2">&amp;</span>Header {
<span class="self">self</span>.raw.header()
<span class="kw">fn </span>header_ptr(<span class="kw-2">&amp;</span><span class="self">self</span>) -&gt; NonNull&lt;Header&gt; {
<span class="self">self</span>.raw.header_ptr()
<span class="kw">impl</span>&lt;S: <span class="lifetime">&#39;static</span>&gt; Notified&lt;S&gt; {
<span class="kw">fn </span>header(<span class="kw-2">&amp;</span><span class="self">self</span>) -&gt; <span class="kw-2">&amp;</span>Header {
<span class="self">self</span>.<span class="number">0</span>.header()
<span class="macro">cfg_rt_multi_thread! </span>{
<span class="kw">impl</span>&lt;S: <span class="lifetime">&#39;static</span>&gt; Notified&lt;S&gt; {
<span class="kw">unsafe fn </span>from_raw(ptr: NonNull&lt;Header&gt;) -&gt; Notified&lt;S&gt; {
<span class="kw">impl</span>&lt;S: <span class="lifetime">&#39;static</span>&gt; Task&lt;S&gt; {
<span class="kw">fn </span>into_raw(<span class="self">self</span>) -&gt; NonNull&lt;Header&gt; {
<span class="kw">let </span>ret = <span class="self">self</span>.raw.header_ptr();
mem::forget(<span class="self">self</span>);
<span class="kw">impl</span>&lt;S: <span class="lifetime">&#39;static</span>&gt; Notified&lt;S&gt; {
<span class="kw">fn </span>into_raw(<span class="self">self</span>) -&gt; NonNull&lt;Header&gt; {
<span class="self">self</span>.<span class="number">0</span>.into_raw()
<span class="kw">impl</span>&lt;S: Schedule&gt; Task&lt;S&gt; {
<span class="doccomment">/// Preemptively cancels the task as part of the shutdown process.
</span><span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">fn </span>shutdown(<span class="self">self</span>) {
<span class="kw">let </span>raw = <span class="self">self</span>.raw;
mem::forget(<span class="self">self</span>);
<span class="kw">impl</span>&lt;S: Schedule&gt; LocalNotified&lt;S&gt; {
<span class="doccomment">/// Runs the task.
</span><span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">fn </span>run(<span class="self">self</span>) {
<span class="kw">let </span>raw = <span class="self">self</span>.task.raw;
mem::forget(<span class="self">self</span>);
<span class="kw">impl</span>&lt;S: Schedule&gt; UnownedTask&lt;S&gt; {
<span class="comment">// Used in test of the inject queue.
</span><span class="attribute">#[cfg(test)]
#[cfg_attr(tokio_wasm, allow(dead_code))]
</span><span class="kw">pub</span>(<span class="kw">super</span>) <span class="kw">fn </span>into_notified(<span class="self">self</span>) -&gt; Notified&lt;S&gt; {
Notified(<span class="self">self</span>.into_task())
<span class="kw">fn </span>into_task(<span class="self">self</span>) -&gt; Task&lt;S&gt; {
<span class="comment">// Convert into a task.
</span><span class="kw">let </span>task = Task {
raw: <span class="self">self</span>.raw,
_p: PhantomData,
mem::forget(<span class="self">self</span>);
<span class="comment">// Drop a ref-count since an UnownedTask holds two.
<span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">fn </span>run(<span class="self">self</span>) {
<span class="kw">let </span>raw = <span class="self">self</span>.raw;
mem::forget(<span class="self">self</span>);
<span class="comment">// Transfer one ref-count to a Task object.
</span><span class="kw">let </span>task = Task::&lt;S&gt; {
_p: PhantomData,
<span class="comment">// Use the other ref-count to poll the task.
<span class="comment">// Decrement our extra ref-count
<span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">fn </span>shutdown(<span class="self">self</span>) {
<span class="self">self</span>.into_task().shutdown()
<span class="kw">impl</span>&lt;S: <span class="lifetime">&#39;static</span>&gt; Drop <span class="kw">for </span>Task&lt;S&gt; {
<span class="kw">fn </span>drop(<span class="kw-2">&amp;mut </span><span class="self">self</span>) {
<span class="comment">// Decrement the ref count
</span><span class="kw">if </span><span class="self">self</span>.header().state.ref_dec() {
<span class="comment">// Deallocate if this is the final ref count
</span><span class="self">self</span>.raw.dealloc();
<span class="kw">impl</span>&lt;S: <span class="lifetime">&#39;static</span>&gt; Drop <span class="kw">for </span>UnownedTask&lt;S&gt; {
<span class="kw">fn </span>drop(<span class="kw-2">&amp;mut </span><span class="self">self</span>) {
<span class="comment">// Decrement the ref count
</span><span class="kw">if </span><span class="self">self</span>.raw.header().state.ref_dec_twice() {
<span class="comment">// Deallocate if this is the final ref count
</span><span class="self">self</span>.raw.dealloc();
<span class="kw">impl</span>&lt;S&gt; fmt::Debug <span class="kw">for </span>Task&lt;S&gt; {
<span class="kw">fn </span>fmt(<span class="kw-2">&amp;</span><span class="self">self</span>, fmt: <span class="kw-2">&amp;mut </span>fmt::Formatter&lt;<span class="lifetime">&#39;_</span>&gt;) -&gt; fmt::Result {
<span class="macro">write!</span>(fmt, <span class="string">&quot;Task({:p})&quot;</span>, <span class="self">self</span>.header())
<span class="kw">impl</span>&lt;S&gt; fmt::Debug <span class="kw">for </span>Notified&lt;S&gt; {
<span class="kw">fn </span>fmt(<span class="kw-2">&amp;</span><span class="self">self</span>, fmt: <span class="kw-2">&amp;mut </span>fmt::Formatter&lt;<span class="lifetime">&#39;_</span>&gt;) -&gt; fmt::Result {
<span class="macro">write!</span>(fmt, <span class="string">&quot;task::Notified({:p})&quot;</span>, <span class="self">self</span>.<span class="number">0</span>.header())
<span class="doccomment">/// # Safety
/// Tasks are pinned.
</span><span class="kw">unsafe impl</span>&lt;S&gt; linked_list::Link <span class="kw">for </span>Task&lt;S&gt; {
<span class="kw">type </span>Handle = Task&lt;S&gt;;
<span class="kw">type </span>Target = Header;
<span class="kw">fn </span>as_raw(handle: <span class="kw-2">&amp;</span>Task&lt;S&gt;) -&gt; NonNull&lt;Header&gt; {
<span class="kw">unsafe fn </span>from_raw(ptr: NonNull&lt;Header&gt;) -&gt; Task&lt;S&gt; {
<span class="kw">unsafe fn </span>pointers(target: NonNull&lt;Header&gt;) -&gt; NonNull&lt;linked_list::Pointers&lt;Header&gt;&gt; {
<span class="self">self</span>::core::Trailer::addr_of_owned(Header::get_trailer(target))
self::core::Trailer::addr_of_owned(Header::get_trailer(target))