async_global_executor/
threading.rs

1use crate::Task;
2use async_channel::{Receiver, Sender};
3use async_lock::Mutex;
4use futures_lite::future;
5use once_cell::sync::OnceCell;
6use std::{io, thread};
7
8// The current number of threads (some might be shutting down and not in the pool anymore)
9static GLOBAL_EXECUTOR_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
10// The expected number of threads (excluding the one that are shutting down)
11static GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
12
13thread_local! {
14    // Used to shutdown a thread when we receive a message from the Sender.
15    // We send an ack using to the Receiver once we're finished shutting down.
16    static THREAD_SHUTDOWN: OnceCell<(Sender<()>, Receiver<()>)> = OnceCell::new();
17}
18
19/// Spawn more executor threads, up to configured max value.
20///
21/// Returns how many threads we spawned.
22///
23/// # Examples
24///
25/// ```
26/// async_global_executor::spawn_more_threads(2);
27/// ```
28pub async fn spawn_more_threads(count: usize) -> io::Result<usize> {
29    // Get the current configuration, or initialize the thread pool.
30    let config = crate::config::GLOBAL_EXECUTOR_CONFIG
31        .get()
32        .unwrap_or_else(|| {
33            crate::init();
34            crate::config::GLOBAL_EXECUTOR_CONFIG.get().unwrap()
35        });
36    // How many threads do we have (including shutting down)
37    let mut threads_number = GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await;
38    // How many threads are we supposed to have (when all shutdowns are complete)
39    let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
40    // Ensure we don't exceed configured max threads (including shutting down)
41    let count = count.min(config.max_threads - *threads_number);
42    for _ in 0..count {
43        thread::Builder::new()
44            .name((config.thread_name_fn)())
45            .spawn(thread_main_loop)?;
46        *threads_number += 1;
47        *expected_threads_number += 1;
48    }
49    Ok(count)
50}
51
52/// Stop one of the executor threads, down to configured min value
53///
54/// Returns whether a thread has been stopped.
55///
56/// # Examples
57///
58/// ```
59/// async_global_executor::stop_thread();
60/// ```
61pub fn stop_thread() -> Task<bool> {
62    crate::spawn(stop_current_executor_thread())
63}
64
65/// Stop the current executor thread, if we exceed the configured min value
66///
67/// Returns whether the thread has been stopped.
68///
69/// # Examples
70///
71/// ```
72/// async_global_executor::stop_current_thread();
73/// ```
74pub fn stop_current_thread() -> Task<bool> {
75    crate::spawn_local(stop_current_executor_thread())
76}
77
78fn thread_main_loop() {
79    // This will be used to ask for shutdown.
80    let (s, r) = async_channel::bounded(1);
81    // This wil be used to ack once shutdown is complete.
82    let (s_ack, r_ack) = async_channel::bounded(1);
83    THREAD_SHUTDOWN.with(|thread_shutdown| drop(thread_shutdown.set((s, r_ack))));
84
85    // Main loop
86    loop {
87        #[allow(clippy::blocks_in_if_conditions)]
88        if std::panic::catch_unwind(|| {
89            crate::executor::LOCAL_EXECUTOR.with(|executor| {
90                let local = executor.run(async {
91                    // Wait until we're asked to shutdown.
92                    let _ = r.recv().await;
93                });
94                let global = crate::executor::GLOBAL_EXECUTOR.run(future::pending::<()>());
95                crate::reactor::block_on(future::or(local, global));
96            });
97        })
98        .is_ok()
99        {
100            break;
101        }
102    }
103
104    wait_for_local_executor_completion();
105
106    // Ack that we're done shutting down.
107    crate::reactor::block_on(async {
108        let _ = s_ack.send(()).await;
109    });
110}
111
112fn wait_for_local_executor_completion() {
113    loop {
114        #[allow(clippy::blocks_in_if_conditions)]
115        if std::panic::catch_unwind(|| {
116            crate::executor::LOCAL_EXECUTOR.with(|executor| {
117                crate::reactor::block_on(async {
118                    // Wait for spawned tasks completion
119                    while !executor.is_empty() {
120                        executor.tick().await;
121                    }
122                });
123            });
124        })
125        .is_ok()
126        {
127            break;
128        }
129    }
130}
131
132async fn stop_current_executor_thread() -> bool {
133    // How many threads are we supposed to have (when all shutdowns are complete)
134    let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
135    // Ensure we don't go below the configured min_threads (ignoring shutting down)
136    if *expected_threads_number
137        > crate::config::GLOBAL_EXECUTOR_CONFIG
138            .get()
139            .unwrap()
140            .min_threads
141    {
142        let (s, r_ack) =
143            THREAD_SHUTDOWN.with(|thread_shutdown| thread_shutdown.get().unwrap().clone());
144        let _ = s.send(()).await;
145        // We now expect to have one less thread (this one is shutting down)
146        *expected_threads_number -= 1;
147        // Unlock the Mutex
148        drop(expected_threads_number);
149        let _ = r_ack.recv().await;
150        // This thread is done shutting down
151        *GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await -= 1;
152        true
153    } else {
154        false
155    }
156}