bevy_utils/parallel_queue.rs
1use std::{cell::RefCell, ops::DerefMut};
2use thread_local::ThreadLocal;
3
4/// A cohesive set of thread-local values of a given type.
5///
6/// Mutable references can be fetched if `T: Default` via [`Parallel::scope`].
7#[derive(Default)]
8pub struct Parallel<T: Send> {
9 locals: ThreadLocal<RefCell<T>>,
10}
11
12/// A scope guard of a `Parallel`, when this struct is dropped ,the value will writeback to its `Parallel`
13impl<T: Send> Parallel<T> {
14 /// Gets a mutable iterator over all of the per-thread queues.
15 pub fn iter_mut(&mut self) -> impl Iterator<Item = &'_ mut T> {
16 self.locals.iter_mut().map(|cell| cell.get_mut())
17 }
18
19 /// Clears all of the stored thread local values.
20 pub fn clear(&mut self) {
21 self.locals.clear();
22 }
23}
24
25impl<T: Default + Send> Parallel<T> {
26 /// Retrieves the thread-local value for the current thread and runs `f` on it.
27 ///
28 /// If there is no thread-local value, it will be initialized to its default.
29 pub fn scope<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
30 let mut cell = self.locals.get_or_default().borrow_mut();
31 let ret = f(cell.deref_mut());
32 ret
33 }
34
35 /// Mutably borrows the thread-local value.
36 ///
37 /// If there is no thread-local value, it will be initialized to it's default.
38 pub fn borrow_local_mut(&self) -> impl DerefMut<Target = T> + '_ {
39 self.locals.get_or_default().borrow_mut()
40 }
41}
42
43impl<T, I> Parallel<I>
44where
45 I: IntoIterator<Item = T> + Default + Send + 'static,
46{
47 /// Drains all enqueued items from all threads and returns an iterator over them.
48 ///
49 /// Unlike [`Vec::drain`], this will piecemeal remove chunks of the data stored.
50 /// If iteration is terminated part way, the rest of the enqueued items in the same
51 /// chunk will be dropped, and the rest of the undrained elements will remain.
52 ///
53 /// The ordering is not guaranteed.
54 pub fn drain<B>(&mut self) -> impl Iterator<Item = T> + '_
55 where
56 B: FromIterator<T>,
57 {
58 self.locals.iter_mut().flat_map(|item| item.take())
59 }
60}
61
62impl<T: Send> Parallel<Vec<T>> {
63 /// Collect all enqueued items from all threads and appends them to the end of a
64 /// single Vec.
65 ///
66 /// The ordering is not guaranteed.
67 pub fn drain_into(&mut self, out: &mut Vec<T>) {
68 let size = self
69 .locals
70 .iter_mut()
71 .map(|queue| queue.get_mut().len())
72 .sum();
73 out.reserve(size);
74 for queue in self.locals.iter_mut() {
75 out.append(queue.get_mut());
76 }
77 }
78}