tokio/runtime/task/
state.rs

1use crate::loom::sync::atomic::AtomicUsize;
2
3use std::fmt;
4use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5
6pub(super) struct State {
7    val: AtomicUsize,
8}
9
10/// Current state value.
11#[derive(Copy, Clone)]
12pub(super) struct Snapshot(usize);
13
14type UpdateResult = Result<Snapshot, Snapshot>;
15
16/// The task is currently being run.
17const RUNNING: usize = 0b0001;
18
19/// The task is complete.
20///
21/// Once this bit is set, it is never unset.
22const COMPLETE: usize = 0b0010;
23
24/// Extracts the task's lifecycle value from the state.
25const LIFECYCLE_MASK: usize = 0b11;
26
27/// Flag tracking if the task has been pushed into a run queue.
28const NOTIFIED: usize = 0b100;
29
30/// The join handle is still around.
31const JOIN_INTEREST: usize = 0b1_000;
32
33/// A join handle waker has been set.
34const JOIN_WAKER: usize = 0b10_000;
35
36/// The task has been forcibly cancelled.
37const CANCELLED: usize = 0b100_000;
38
39/// All bits.
40const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
41
42/// Bits used by the ref count portion of the state.
43const REF_COUNT_MASK: usize = !STATE_MASK;
44
45/// Number of positions to shift the ref count.
46const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
47
48/// One ref count.
49const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
50
51/// State a task is initialized with.
52///
53/// A task is initialized with three references:
54///
55///  * A reference that will be stored in an `OwnedTasks` or `LocalOwnedTasks`.
56///  * A reference that will be sent to the scheduler as an ordinary notification.
57///  * A reference for the `JoinHandle`.
58///
59/// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set.
60/// As the task starts with a `Notified`, `NOTIFIED` is set.
61const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED;
62
63#[must_use]
64pub(super) enum TransitionToRunning {
65    Success,
66    Cancelled,
67    Failed,
68    Dealloc,
69}
70
71#[must_use]
72pub(super) enum TransitionToIdle {
73    Ok,
74    OkNotified,
75    OkDealloc,
76    Cancelled,
77}
78
79#[must_use]
80pub(super) enum TransitionToNotifiedByVal {
81    DoNothing,
82    Submit,
83    Dealloc,
84}
85
86#[must_use]
87pub(crate) enum TransitionToNotifiedByRef {
88    DoNothing,
89    Submit,
90}
91
92/// All transitions are performed via RMW operations. This establishes an
93/// unambiguous modification order.
94impl State {
95    /// Returns a task's initial state.
96    pub(super) fn new() -> State {
97        // The raw task returned by this method has a ref-count of three. See
98        // the comment on INITIAL_STATE for more.
99        State {
100            val: AtomicUsize::new(INITIAL_STATE),
101        }
102    }
103
104    /// Loads the current state, establishes `Acquire` ordering.
105    pub(super) fn load(&self) -> Snapshot {
106        Snapshot(self.val.load(Acquire))
107    }
108
109    /// Attempts to transition the lifecycle to `Running`. This sets the
110    /// notified bit to false so notifications during the poll can be detected.
111    pub(super) fn transition_to_running(&self) -> TransitionToRunning {
112        self.fetch_update_action(|mut next| {
113            let action;
114            assert!(next.is_notified());
115
116            if !next.is_idle() {
117                // This happens if the task is either currently running or if it
118                // has already completed, e.g. if it was cancelled during
119                // shutdown. Consume the ref-count and return.
120                next.ref_dec();
121                if next.ref_count() == 0 {
122                    action = TransitionToRunning::Dealloc;
123                } else {
124                    action = TransitionToRunning::Failed;
125                }
126            } else {
127                // We are able to lock the RUNNING bit.
128                next.set_running();
129                next.unset_notified();
130
131                if next.is_cancelled() {
132                    action = TransitionToRunning::Cancelled;
133                } else {
134                    action = TransitionToRunning::Success;
135                }
136            }
137            (action, Some(next))
138        })
139    }
140
141    /// Transitions the task from `Running` -> `Idle`.
142    ///
143    /// The transition to `Idle` fails if the task has been flagged to be
144    /// cancelled.
145    pub(super) fn transition_to_idle(&self) -> TransitionToIdle {
146        self.fetch_update_action(|curr| {
147            assert!(curr.is_running());
148
149            if curr.is_cancelled() {
150                return (TransitionToIdle::Cancelled, None);
151            }
152
153            let mut next = curr;
154            let action;
155            next.unset_running();
156
157            if !next.is_notified() {
158                // Polling the future consumes the ref-count of the Notified.
159                next.ref_dec();
160                if next.ref_count() == 0 {
161                    action = TransitionToIdle::OkDealloc;
162                } else {
163                    action = TransitionToIdle::Ok;
164                }
165            } else {
166                // The caller will schedule a new notification, so we create a
167                // new ref-count for the notification. Our own ref-count is kept
168                // for now, and the caller will drop it shortly.
169                next.ref_inc();
170                action = TransitionToIdle::OkNotified;
171            }
172
173            (action, Some(next))
174        })
175    }
176
177    /// Transitions the task from `Running` -> `Complete`.
178    pub(super) fn transition_to_complete(&self) -> Snapshot {
179        const DELTA: usize = RUNNING | COMPLETE;
180
181        let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
182        assert!(prev.is_running());
183        assert!(!prev.is_complete());
184
185        Snapshot(prev.0 ^ DELTA)
186    }
187
188    /// Transitions from `Complete` -> `Terminal`, decrementing the reference
189    /// count the specified number of times.
190    ///
191    /// Returns true if the task should be deallocated.
192    pub(super) fn transition_to_terminal(&self, count: usize) -> bool {
193        let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel));
194        assert!(
195            prev.ref_count() >= count,
196            "current: {}, sub: {}",
197            prev.ref_count(),
198            count
199        );
200        prev.ref_count() == count
201    }
202
203    /// Transitions the state to `NOTIFIED`.
204    ///
205    /// If no task needs to be submitted, a ref-count is consumed.
206    ///
207    /// If a task needs to be submitted, the ref-count is incremented for the
208    /// new Notified.
209    pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal {
210        self.fetch_update_action(|mut snapshot| {
211            let action;
212
213            if snapshot.is_running() {
214                // If the task is running, we mark it as notified, but we should
215                // not submit anything as the thread currently running the
216                // future is responsible for that.
217                snapshot.set_notified();
218                snapshot.ref_dec();
219
220                // The thread that set the running bit also holds a ref-count.
221                assert!(snapshot.ref_count() > 0);
222
223                action = TransitionToNotifiedByVal::DoNothing;
224            } else if snapshot.is_complete() || snapshot.is_notified() {
225                // We do not need to submit any notifications, but we have to
226                // decrement the ref-count.
227                snapshot.ref_dec();
228
229                if snapshot.ref_count() == 0 {
230                    action = TransitionToNotifiedByVal::Dealloc;
231                } else {
232                    action = TransitionToNotifiedByVal::DoNothing;
233                }
234            } else {
235                // We create a new notified that we can submit. The caller
236                // retains ownership of the ref-count they passed in.
237                snapshot.set_notified();
238                snapshot.ref_inc();
239                action = TransitionToNotifiedByVal::Submit;
240            }
241
242            (action, Some(snapshot))
243        })
244    }
245
246    /// Transitions the state to `NOTIFIED`.
247    pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef {
248        self.fetch_update_action(|mut snapshot| {
249            if snapshot.is_complete() || snapshot.is_notified() {
250                // There is nothing to do in this case.
251                (TransitionToNotifiedByRef::DoNothing, None)
252            } else if snapshot.is_running() {
253                // If the task is running, we mark it as notified, but we should
254                // not submit as the thread currently running the future is
255                // responsible for that.
256                snapshot.set_notified();
257                (TransitionToNotifiedByRef::DoNothing, Some(snapshot))
258            } else {
259                // The task is idle and not notified. We should submit a
260                // notification.
261                snapshot.set_notified();
262                snapshot.ref_inc();
263                (TransitionToNotifiedByRef::Submit, Some(snapshot))
264            }
265        })
266    }
267
268    /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref
269    /// count.
270    ///
271    /// Returns `true` if the notified bit was transitioned from `0` to `1`;
272    /// otherwise `false.`
273    #[cfg(all(
274        tokio_unstable,
275        tokio_taskdump,
276        feature = "rt",
277        target_os = "linux",
278        any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
279    ))]
280    pub(super) fn transition_to_notified_for_tracing(&self) -> bool {
281        self.fetch_update_action(|mut snapshot| {
282            if snapshot.is_notified() {
283                (false, None)
284            } else {
285                snapshot.set_notified();
286                snapshot.ref_inc();
287                (true, Some(snapshot))
288            }
289        })
290    }
291
292    /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
293    ///
294    /// Returns `true` if the task needs to be submitted to the pool for
295    /// execution.
296    pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
297        self.fetch_update_action(|mut snapshot| {
298            if snapshot.is_cancelled() || snapshot.is_complete() {
299                // Aborts to completed or cancelled tasks are no-ops.
300                (false, None)
301            } else if snapshot.is_running() {
302                // If the task is running, we mark it as cancelled. The thread
303                // running the task will notice the cancelled bit when it
304                // stops polling and it will kill the task.
305                //
306                // The set_notified() call is not strictly necessary but it will
307                // in some cases let a wake_by_ref call return without having
308                // to perform a compare_exchange.
309                snapshot.set_notified();
310                snapshot.set_cancelled();
311                (false, Some(snapshot))
312            } else {
313                // The task is idle. We set the cancelled and notified bits and
314                // submit a notification if the notified bit was not already
315                // set.
316                snapshot.set_cancelled();
317                if !snapshot.is_notified() {
318                    snapshot.set_notified();
319                    snapshot.ref_inc();
320                    (true, Some(snapshot))
321                } else {
322                    (false, Some(snapshot))
323                }
324            }
325        })
326    }
327
328    /// Sets the `CANCELLED` bit and attempts to transition to `Running`.
329    ///
330    /// Returns `true` if the transition to `Running` succeeded.
331    pub(super) fn transition_to_shutdown(&self) -> bool {
332        let mut prev = Snapshot(0);
333
334        let _ = self.fetch_update(|mut snapshot| {
335            prev = snapshot;
336
337            if snapshot.is_idle() {
338                snapshot.set_running();
339            }
340
341            // If the task was not idle, the thread currently running the task
342            // will notice the cancelled bit and cancel it once the poll
343            // completes.
344            snapshot.set_cancelled();
345            Some(snapshot)
346        });
347
348        prev.is_idle()
349    }
350
351    /// Optimistically tries to swap the state assuming the join handle is
352    /// __immediately__ dropped on spawn.
353    pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
354        use std::sync::atomic::Ordering::Relaxed;
355
356        // Relaxed is acceptable as if this function is called and succeeds,
357        // then nothing has been done w/ the join handle.
358        //
359        // The moment the join handle is used (polled), the `JOIN_WAKER` flag is
360        // set, at which point the CAS will fail.
361        //
362        // Given this, there is no risk if this operation is reordered.
363        self.val
364            .compare_exchange_weak(
365                INITIAL_STATE,
366                (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST,
367                Release,
368                Relaxed,
369            )
370            .map(|_| ())
371            .map_err(|_| ())
372    }
373
374    /// Tries to unset the `JOIN_INTEREST` flag.
375    ///
376    /// Returns `Ok` if the operation happens before the task transitions to a
377    /// completed state, `Err` otherwise.
378    pub(super) fn unset_join_interested(&self) -> UpdateResult {
379        self.fetch_update(|curr| {
380            assert!(curr.is_join_interested());
381
382            if curr.is_complete() {
383                return None;
384            }
385
386            let mut next = curr;
387            next.unset_join_interested();
388
389            Some(next)
390        })
391    }
392
393    /// Sets the `JOIN_WAKER` bit.
394    ///
395    /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if
396    /// the task has completed.
397    pub(super) fn set_join_waker(&self) -> UpdateResult {
398        self.fetch_update(|curr| {
399            assert!(curr.is_join_interested());
400            assert!(!curr.is_join_waker_set());
401
402            if curr.is_complete() {
403                return None;
404            }
405
406            let mut next = curr;
407            next.set_join_waker();
408
409            Some(next)
410        })
411    }
412
413    /// Unsets the `JOIN_WAKER` bit.
414    ///
415    /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if
416    /// the task has completed.
417    pub(super) fn unset_waker(&self) -> UpdateResult {
418        self.fetch_update(|curr| {
419            assert!(curr.is_join_interested());
420            assert!(curr.is_join_waker_set());
421
422            if curr.is_complete() {
423                return None;
424            }
425
426            let mut next = curr;
427            next.unset_join_waker();
428
429            Some(next)
430        })
431    }
432
433    pub(super) fn ref_inc(&self) {
434        use std::process;
435        use std::sync::atomic::Ordering::Relaxed;
436
437        // Using a relaxed ordering is alright here, as knowledge of the
438        // original reference prevents other threads from erroneously deleting
439        // the object.
440        //
441        // As explained in the [Boost documentation][1], Increasing the
442        // reference counter can always be done with memory_order_relaxed: New
443        // references to an object can only be formed from an existing
444        // reference, and passing an existing reference from one thread to
445        // another must already provide any required synchronization.
446        //
447        // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
448        let prev = self.val.fetch_add(REF_ONE, Relaxed);
449
450        // If the reference count overflowed, abort.
451        if prev > isize::MAX as usize {
452            process::abort();
453        }
454    }
455
456    /// Returns `true` if the task should be released.
457    pub(super) fn ref_dec(&self) -> bool {
458        let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
459        assert!(prev.ref_count() >= 1);
460        prev.ref_count() == 1
461    }
462
463    /// Returns `true` if the task should be released.
464    pub(super) fn ref_dec_twice(&self) -> bool {
465        let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel));
466        assert!(prev.ref_count() >= 2);
467        prev.ref_count() == 2
468    }
469
470    fn fetch_update_action<F, T>(&self, mut f: F) -> T
471    where
472        F: FnMut(Snapshot) -> (T, Option<Snapshot>),
473    {
474        let mut curr = self.load();
475
476        loop {
477            let (output, next) = f(curr);
478            let next = match next {
479                Some(next) => next,
480                None => return output,
481            };
482
483            let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
484
485            match res {
486                Ok(_) => return output,
487                Err(actual) => curr = Snapshot(actual),
488            }
489        }
490    }
491
492    fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
493    where
494        F: FnMut(Snapshot) -> Option<Snapshot>,
495    {
496        let mut curr = self.load();
497
498        loop {
499            let next = match f(curr) {
500                Some(next) => next,
501                None => return Err(curr),
502            };
503
504            let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
505
506            match res {
507                Ok(_) => return Ok(next),
508                Err(actual) => curr = Snapshot(actual),
509            }
510        }
511    }
512}
513
514// ===== impl Snapshot =====
515
516impl Snapshot {
517    /// Returns `true` if the task is in an idle state.
518    pub(super) fn is_idle(self) -> bool {
519        self.0 & (RUNNING | COMPLETE) == 0
520    }
521
522    /// Returns `true` if the task has been flagged as notified.
523    pub(super) fn is_notified(self) -> bool {
524        self.0 & NOTIFIED == NOTIFIED
525    }
526
527    fn unset_notified(&mut self) {
528        self.0 &= !NOTIFIED;
529    }
530
531    fn set_notified(&mut self) {
532        self.0 |= NOTIFIED;
533    }
534
535    pub(super) fn is_running(self) -> bool {
536        self.0 & RUNNING == RUNNING
537    }
538
539    fn set_running(&mut self) {
540        self.0 |= RUNNING;
541    }
542
543    fn unset_running(&mut self) {
544        self.0 &= !RUNNING;
545    }
546
547    pub(super) fn is_cancelled(self) -> bool {
548        self.0 & CANCELLED == CANCELLED
549    }
550
551    fn set_cancelled(&mut self) {
552        self.0 |= CANCELLED;
553    }
554
555    /// Returns `true` if the task's future has completed execution.
556    pub(super) fn is_complete(self) -> bool {
557        self.0 & COMPLETE == COMPLETE
558    }
559
560    pub(super) fn is_join_interested(self) -> bool {
561        self.0 & JOIN_INTEREST == JOIN_INTEREST
562    }
563
564    fn unset_join_interested(&mut self) {
565        self.0 &= !JOIN_INTEREST;
566    }
567
568    pub(super) fn is_join_waker_set(self) -> bool {
569        self.0 & JOIN_WAKER == JOIN_WAKER
570    }
571
572    fn set_join_waker(&mut self) {
573        self.0 |= JOIN_WAKER;
574    }
575
576    fn unset_join_waker(&mut self) {
577        self.0 &= !JOIN_WAKER;
578    }
579
580    pub(super) fn ref_count(self) -> usize {
581        (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT
582    }
583
584    fn ref_inc(&mut self) {
585        assert!(self.0 <= isize::MAX as usize);
586        self.0 += REF_ONE;
587    }
588
589    pub(super) fn ref_dec(&mut self) {
590        assert!(self.ref_count() > 0);
591        self.0 -= REF_ONE;
592    }
593}
594
595impl fmt::Debug for State {
596    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
597        let snapshot = self.load();
598        snapshot.fmt(fmt)
599    }
600}
601
602impl fmt::Debug for Snapshot {
603    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
604        fmt.debug_struct("Snapshot")
605            .field("is_running", &self.is_running())
606            .field("is_complete", &self.is_complete())
607            .field("is_notified", &self.is_notified())
608            .field("is_cancelled", &self.is_cancelled())
609            .field("is_join_interested", &self.is_join_interested())
610            .field("is_join_waker_set", &self.is_join_waker_set())
611            .field("ref_count", &self.ref_count())
612            .finish()
613    }
614}