async_global_executor/
threading.rs1use crate::Task;
2use async_channel::{Receiver, Sender};
3use async_lock::Mutex;
4use futures_lite::future;
5use once_cell::sync::OnceCell;
6use std::{io, thread};
7
8static GLOBAL_EXECUTOR_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
10static GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
12
13thread_local! {
14 static THREAD_SHUTDOWN: OnceCell<(Sender<()>, Receiver<()>)> = OnceCell::new();
17}
18
19pub async fn spawn_more_threads(count: usize) -> io::Result<usize> {
29 let config = crate::config::GLOBAL_EXECUTOR_CONFIG
31 .get()
32 .unwrap_or_else(|| {
33 crate::init();
34 crate::config::GLOBAL_EXECUTOR_CONFIG.get().unwrap()
35 });
36 let mut threads_number = GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await;
38 let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
40 let count = count.min(config.max_threads - *threads_number);
42 for _ in 0..count {
43 thread::Builder::new()
44 .name((config.thread_name_fn)())
45 .spawn(thread_main_loop)?;
46 *threads_number += 1;
47 *expected_threads_number += 1;
48 }
49 Ok(count)
50}
51
52pub fn stop_thread() -> Task<bool> {
62 crate::spawn(stop_current_executor_thread())
63}
64
65pub fn stop_current_thread() -> Task<bool> {
75 crate::spawn_local(stop_current_executor_thread())
76}
77
78fn thread_main_loop() {
79 let (s, r) = async_channel::bounded(1);
81 let (s_ack, r_ack) = async_channel::bounded(1);
83 THREAD_SHUTDOWN.with(|thread_shutdown| drop(thread_shutdown.set((s, r_ack))));
84
85 loop {
87 #[allow(clippy::blocks_in_if_conditions)]
88 if std::panic::catch_unwind(|| {
89 crate::executor::LOCAL_EXECUTOR.with(|executor| {
90 let local = executor.run(async {
91 let _ = r.recv().await;
93 });
94 let global = crate::executor::GLOBAL_EXECUTOR.run(future::pending::<()>());
95 crate::reactor::block_on(future::or(local, global));
96 });
97 })
98 .is_ok()
99 {
100 break;
101 }
102 }
103
104 wait_for_local_executor_completion();
105
106 crate::reactor::block_on(async {
108 let _ = s_ack.send(()).await;
109 });
110}
111
112fn wait_for_local_executor_completion() {
113 loop {
114 #[allow(clippy::blocks_in_if_conditions)]
115 if std::panic::catch_unwind(|| {
116 crate::executor::LOCAL_EXECUTOR.with(|executor| {
117 crate::reactor::block_on(async {
118 while !executor.is_empty() {
120 executor.tick().await;
121 }
122 });
123 });
124 })
125 .is_ok()
126 {
127 break;
128 }
129 }
130}
131
132async fn stop_current_executor_thread() -> bool {
133 let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
135 if *expected_threads_number
137 > crate::config::GLOBAL_EXECUTOR_CONFIG
138 .get()
139 .unwrap()
140 .min_threads
141 {
142 let (s, r_ack) =
143 THREAD_SHUTDOWN.with(|thread_shutdown| thread_shutdown.get().unwrap().clone());
144 let _ = s.send(()).await;
145 *expected_threads_number -= 1;
147 drop(expected_threads_number);
149 let _ = r_ack.recv().await;
150 *GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await -= 1;
152 true
153 } else {
154 false
155 }
156}