tokio/runtime/task/
state.rs1use crate::loom::sync::atomic::AtomicUsize;
2
3use std::fmt;
4use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5
6pub(super) struct State {
7 val: AtomicUsize,
8}
9
10#[derive(Copy, Clone)]
12pub(super) struct Snapshot(usize);
13
14type UpdateResult = Result<Snapshot, Snapshot>;
15
16const RUNNING: usize = 0b0001;
18
19const COMPLETE: usize = 0b0010;
23
24const LIFECYCLE_MASK: usize = 0b11;
26
27const NOTIFIED: usize = 0b100;
29
30const JOIN_INTEREST: usize = 0b1_000;
32
33const JOIN_WAKER: usize = 0b10_000;
35
36const CANCELLED: usize = 0b100_000;
38
39const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
41
42const REF_COUNT_MASK: usize = !STATE_MASK;
44
45const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
47
48const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
50
51const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED;
62
63#[must_use]
64pub(super) enum TransitionToRunning {
65 Success,
66 Cancelled,
67 Failed,
68 Dealloc,
69}
70
71#[must_use]
72pub(super) enum TransitionToIdle {
73 Ok,
74 OkNotified,
75 OkDealloc,
76 Cancelled,
77}
78
79#[must_use]
80pub(super) enum TransitionToNotifiedByVal {
81 DoNothing,
82 Submit,
83 Dealloc,
84}
85
86#[must_use]
87pub(crate) enum TransitionToNotifiedByRef {
88 DoNothing,
89 Submit,
90}
91
92impl State {
95 pub(super) fn new() -> State {
97 State {
100 val: AtomicUsize::new(INITIAL_STATE),
101 }
102 }
103
104 pub(super) fn load(&self) -> Snapshot {
106 Snapshot(self.val.load(Acquire))
107 }
108
109 pub(super) fn transition_to_running(&self) -> TransitionToRunning {
112 self.fetch_update_action(|mut next| {
113 let action;
114 assert!(next.is_notified());
115
116 if !next.is_idle() {
117 next.ref_dec();
121 if next.ref_count() == 0 {
122 action = TransitionToRunning::Dealloc;
123 } else {
124 action = TransitionToRunning::Failed;
125 }
126 } else {
127 next.set_running();
129 next.unset_notified();
130
131 if next.is_cancelled() {
132 action = TransitionToRunning::Cancelled;
133 } else {
134 action = TransitionToRunning::Success;
135 }
136 }
137 (action, Some(next))
138 })
139 }
140
141 pub(super) fn transition_to_idle(&self) -> TransitionToIdle {
146 self.fetch_update_action(|curr| {
147 assert!(curr.is_running());
148
149 if curr.is_cancelled() {
150 return (TransitionToIdle::Cancelled, None);
151 }
152
153 let mut next = curr;
154 let action;
155 next.unset_running();
156
157 if !next.is_notified() {
158 next.ref_dec();
160 if next.ref_count() == 0 {
161 action = TransitionToIdle::OkDealloc;
162 } else {
163 action = TransitionToIdle::Ok;
164 }
165 } else {
166 next.ref_inc();
170 action = TransitionToIdle::OkNotified;
171 }
172
173 (action, Some(next))
174 })
175 }
176
177 pub(super) fn transition_to_complete(&self) -> Snapshot {
179 const DELTA: usize = RUNNING | COMPLETE;
180
181 let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
182 assert!(prev.is_running());
183 assert!(!prev.is_complete());
184
185 Snapshot(prev.0 ^ DELTA)
186 }
187
188 pub(super) fn transition_to_terminal(&self, count: usize) -> bool {
193 let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel));
194 assert!(
195 prev.ref_count() >= count,
196 "current: {}, sub: {}",
197 prev.ref_count(),
198 count
199 );
200 prev.ref_count() == count
201 }
202
203 pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal {
210 self.fetch_update_action(|mut snapshot| {
211 let action;
212
213 if snapshot.is_running() {
214 snapshot.set_notified();
218 snapshot.ref_dec();
219
220 assert!(snapshot.ref_count() > 0);
222
223 action = TransitionToNotifiedByVal::DoNothing;
224 } else if snapshot.is_complete() || snapshot.is_notified() {
225 snapshot.ref_dec();
228
229 if snapshot.ref_count() == 0 {
230 action = TransitionToNotifiedByVal::Dealloc;
231 } else {
232 action = TransitionToNotifiedByVal::DoNothing;
233 }
234 } else {
235 snapshot.set_notified();
238 snapshot.ref_inc();
239 action = TransitionToNotifiedByVal::Submit;
240 }
241
242 (action, Some(snapshot))
243 })
244 }
245
246 pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef {
248 self.fetch_update_action(|mut snapshot| {
249 if snapshot.is_complete() || snapshot.is_notified() {
250 (TransitionToNotifiedByRef::DoNothing, None)
252 } else if snapshot.is_running() {
253 snapshot.set_notified();
257 (TransitionToNotifiedByRef::DoNothing, Some(snapshot))
258 } else {
259 snapshot.set_notified();
262 snapshot.ref_inc();
263 (TransitionToNotifiedByRef::Submit, Some(snapshot))
264 }
265 })
266 }
267
268 #[cfg(all(
274 tokio_unstable,
275 tokio_taskdump,
276 feature = "rt",
277 target_os = "linux",
278 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
279 ))]
280 pub(super) fn transition_to_notified_for_tracing(&self) -> bool {
281 self.fetch_update_action(|mut snapshot| {
282 if snapshot.is_notified() {
283 (false, None)
284 } else {
285 snapshot.set_notified();
286 snapshot.ref_inc();
287 (true, Some(snapshot))
288 }
289 })
290 }
291
292 pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
297 self.fetch_update_action(|mut snapshot| {
298 if snapshot.is_cancelled() || snapshot.is_complete() {
299 (false, None)
301 } else if snapshot.is_running() {
302 snapshot.set_notified();
310 snapshot.set_cancelled();
311 (false, Some(snapshot))
312 } else {
313 snapshot.set_cancelled();
317 if !snapshot.is_notified() {
318 snapshot.set_notified();
319 snapshot.ref_inc();
320 (true, Some(snapshot))
321 } else {
322 (false, Some(snapshot))
323 }
324 }
325 })
326 }
327
328 pub(super) fn transition_to_shutdown(&self) -> bool {
332 let mut prev = Snapshot(0);
333
334 let _ = self.fetch_update(|mut snapshot| {
335 prev = snapshot;
336
337 if snapshot.is_idle() {
338 snapshot.set_running();
339 }
340
341 snapshot.set_cancelled();
345 Some(snapshot)
346 });
347
348 prev.is_idle()
349 }
350
351 pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
354 use std::sync::atomic::Ordering::Relaxed;
355
356 self.val
364 .compare_exchange_weak(
365 INITIAL_STATE,
366 (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST,
367 Release,
368 Relaxed,
369 )
370 .map(|_| ())
371 .map_err(|_| ())
372 }
373
374 pub(super) fn unset_join_interested(&self) -> UpdateResult {
379 self.fetch_update(|curr| {
380 assert!(curr.is_join_interested());
381
382 if curr.is_complete() {
383 return None;
384 }
385
386 let mut next = curr;
387 next.unset_join_interested();
388
389 Some(next)
390 })
391 }
392
393 pub(super) fn set_join_waker(&self) -> UpdateResult {
398 self.fetch_update(|curr| {
399 assert!(curr.is_join_interested());
400 assert!(!curr.is_join_waker_set());
401
402 if curr.is_complete() {
403 return None;
404 }
405
406 let mut next = curr;
407 next.set_join_waker();
408
409 Some(next)
410 })
411 }
412
413 pub(super) fn unset_waker(&self) -> UpdateResult {
418 self.fetch_update(|curr| {
419 assert!(curr.is_join_interested());
420 assert!(curr.is_join_waker_set());
421
422 if curr.is_complete() {
423 return None;
424 }
425
426 let mut next = curr;
427 next.unset_join_waker();
428
429 Some(next)
430 })
431 }
432
433 pub(super) fn ref_inc(&self) {
434 use std::process;
435 use std::sync::atomic::Ordering::Relaxed;
436
437 let prev = self.val.fetch_add(REF_ONE, Relaxed);
449
450 if prev > isize::MAX as usize {
452 process::abort();
453 }
454 }
455
456 pub(super) fn ref_dec(&self) -> bool {
458 let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
459 assert!(prev.ref_count() >= 1);
460 prev.ref_count() == 1
461 }
462
463 pub(super) fn ref_dec_twice(&self) -> bool {
465 let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel));
466 assert!(prev.ref_count() >= 2);
467 prev.ref_count() == 2
468 }
469
470 fn fetch_update_action<F, T>(&self, mut f: F) -> T
471 where
472 F: FnMut(Snapshot) -> (T, Option<Snapshot>),
473 {
474 let mut curr = self.load();
475
476 loop {
477 let (output, next) = f(curr);
478 let next = match next {
479 Some(next) => next,
480 None => return output,
481 };
482
483 let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
484
485 match res {
486 Ok(_) => return output,
487 Err(actual) => curr = Snapshot(actual),
488 }
489 }
490 }
491
492 fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
493 where
494 F: FnMut(Snapshot) -> Option<Snapshot>,
495 {
496 let mut curr = self.load();
497
498 loop {
499 let next = match f(curr) {
500 Some(next) => next,
501 None => return Err(curr),
502 };
503
504 let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
505
506 match res {
507 Ok(_) => return Ok(next),
508 Err(actual) => curr = Snapshot(actual),
509 }
510 }
511 }
512}
513
514impl Snapshot {
517 pub(super) fn is_idle(self) -> bool {
519 self.0 & (RUNNING | COMPLETE) == 0
520 }
521
522 pub(super) fn is_notified(self) -> bool {
524 self.0 & NOTIFIED == NOTIFIED
525 }
526
527 fn unset_notified(&mut self) {
528 self.0 &= !NOTIFIED;
529 }
530
531 fn set_notified(&mut self) {
532 self.0 |= NOTIFIED;
533 }
534
535 pub(super) fn is_running(self) -> bool {
536 self.0 & RUNNING == RUNNING
537 }
538
539 fn set_running(&mut self) {
540 self.0 |= RUNNING;
541 }
542
543 fn unset_running(&mut self) {
544 self.0 &= !RUNNING;
545 }
546
547 pub(super) fn is_cancelled(self) -> bool {
548 self.0 & CANCELLED == CANCELLED
549 }
550
551 fn set_cancelled(&mut self) {
552 self.0 |= CANCELLED;
553 }
554
555 pub(super) fn is_complete(self) -> bool {
557 self.0 & COMPLETE == COMPLETE
558 }
559
560 pub(super) fn is_join_interested(self) -> bool {
561 self.0 & JOIN_INTEREST == JOIN_INTEREST
562 }
563
564 fn unset_join_interested(&mut self) {
565 self.0 &= !JOIN_INTEREST;
566 }
567
568 pub(super) fn is_join_waker_set(self) -> bool {
569 self.0 & JOIN_WAKER == JOIN_WAKER
570 }
571
572 fn set_join_waker(&mut self) {
573 self.0 |= JOIN_WAKER;
574 }
575
576 fn unset_join_waker(&mut self) {
577 self.0 &= !JOIN_WAKER;
578 }
579
580 pub(super) fn ref_count(self) -> usize {
581 (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT
582 }
583
584 fn ref_inc(&mut self) {
585 assert!(self.0 <= isize::MAX as usize);
586 self.0 += REF_ONE;
587 }
588
589 pub(super) fn ref_dec(&mut self) {
590 assert!(self.ref_count() > 0);
591 self.0 -= REF_ONE;
592 }
593}
594
595impl fmt::Debug for State {
596 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
597 let snapshot = self.load();
598 snapshot.fmt(fmt)
599 }
600}
601
602impl fmt::Debug for Snapshot {
603 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
604 fmt.debug_struct("Snapshot")
605 .field("is_running", &self.is_running())
606 .field("is_complete", &self.is_complete())
607 .field("is_notified", &self.is_notified())
608 .field("is_cancelled", &self.is_cancelled())
609 .field("is_join_interested", &self.is_join_interested())
610 .field("is_join_waker_set", &self.is_join_waker_set())
611 .field("ref_count", &self.ref_count())
612 .finish()
613 }
614}