bevy_ecs/world/
command_queue.rs

1use crate::system::{SystemBuffer, SystemMeta};
2
3use std::{
4    fmt::Debug,
5    mem::MaybeUninit,
6    panic::{self, AssertUnwindSafe},
7    ptr::{addr_of_mut, NonNull},
8};
9
10use bevy_ptr::{OwningPtr, Unaligned};
11use bevy_utils::tracing::warn;
12
13use crate::world::{Command, World};
14
15use super::DeferredWorld;
16
17struct CommandMeta {
18    /// SAFETY: The `value` must point to a value of type `T: Command`,
19    /// where `T` is some specific type that was used to produce this metadata.
20    ///
21    /// `world` is optional to allow this one function pointer to perform double-duty as a drop.
22    ///
23    /// Advances `cursor` by the size of `T` in bytes.
24    consume_command_and_get_size:
25        unsafe fn(value: OwningPtr<Unaligned>, world: Option<NonNull<World>>, cursor: &mut usize),
26}
27
28/// Densely and efficiently stores a queue of heterogenous types implementing [`Command`].
29//
30// NOTE: [`CommandQueue`] is implemented via a `Vec<MaybeUninit<u8>>` instead of a `Vec<Box<dyn Command>>`
31// as an optimization. Since commands are used frequently in systems as a way to spawn
32// entities/components/resources, and it's not currently possible to parallelize these
33// due to mutable [`World`] access, maximizing performance for [`CommandQueue`] is
34// preferred to simplicity of implementation.
35#[derive(Default)]
36pub struct CommandQueue {
37    // This buffer densely stores all queued commands.
38    //
39    // For each command, one `CommandMeta` is stored, followed by zero or more bytes
40    // to store the command itself. To interpret these bytes, a pointer must
41    // be passed to the corresponding `CommandMeta.apply_command_and_get_size` fn pointer.
42    pub(crate) bytes: Vec<MaybeUninit<u8>>,
43    pub(crate) cursor: usize,
44    pub(crate) panic_recovery: Vec<MaybeUninit<u8>>,
45}
46
47/// Wraps pointers to a [`CommandQueue`], used internally to avoid stacked borrow rules when
48/// partially applying the world's command queue recursively
49#[derive(Clone)]
50pub(crate) struct RawCommandQueue {
51    pub(crate) bytes: NonNull<Vec<MaybeUninit<u8>>>,
52    pub(crate) cursor: NonNull<usize>,
53    pub(crate) panic_recovery: NonNull<Vec<MaybeUninit<u8>>>,
54}
55
56// CommandQueue needs to implement Debug manually, rather than deriving it, because the derived impl just prints
57// [core::mem::maybe_uninit::MaybeUninit<u8>, core::mem::maybe_uninit::MaybeUninit<u8>, ..] for every byte in the vec,
58// which gets extremely verbose very quickly, while also providing no useful information.
59// It is not possible to soundly print the values of the contained bytes, as some of them may be padding or uninitialized (#4863)
60// So instead, the manual impl just prints the length of vec.
61impl Debug for CommandQueue {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("CommandQueue")
64            .field("len_bytes", &self.bytes.len())
65            .finish_non_exhaustive()
66    }
67}
68
69// SAFETY: All commands [`Command`] implement [`Send`]
70unsafe impl Send for CommandQueue {}
71
72// SAFETY: `&CommandQueue` never gives access to the inner commands.
73unsafe impl Sync for CommandQueue {}
74
75impl CommandQueue {
76    /// Push a [`Command`] onto the queue.
77    #[inline]
78    pub fn push<C>(&mut self, command: C)
79    where
80        C: Command,
81    {
82        // SAFETY: self is guaranteed to live for the lifetime of this method
83        unsafe {
84            self.get_raw().push(command);
85        }
86    }
87
88    /// Execute the queued [`Command`]s in the world after applying any commands in the world's internal queue.
89    /// This clears the queue.
90    #[inline]
91    pub fn apply(&mut self, world: &mut World) {
92        // flush the previously queued entities
93        world.flush_entities();
94
95        // flush the world's internal queue
96        world.flush_commands();
97
98        // SAFETY: A reference is always a valid pointer
99        unsafe {
100            self.get_raw().apply_or_drop_queued(Some(world.into()));
101        }
102    }
103
104    /// Take all commands from `other` and append them to `self`, leaving `other` empty
105    pub fn append(&mut self, other: &mut CommandQueue) {
106        self.bytes.append(&mut other.bytes);
107    }
108
109    /// Returns false if there are any commands in the queue
110    #[inline]
111    pub fn is_empty(&self) -> bool {
112        self.cursor >= self.bytes.len()
113    }
114
115    /// Returns a [`RawCommandQueue`] instance sharing the underlying command queue.
116    pub(crate) fn get_raw(&mut self) -> RawCommandQueue {
117        // SAFETY: self is always valid memory
118        unsafe {
119            RawCommandQueue {
120                bytes: NonNull::new_unchecked(addr_of_mut!(self.bytes)),
121                cursor: NonNull::new_unchecked(addr_of_mut!(self.cursor)),
122                panic_recovery: NonNull::new_unchecked(addr_of_mut!(self.panic_recovery)),
123            }
124        }
125    }
126}
127
128impl RawCommandQueue {
129    /// Returns a new `RawCommandQueue` instance, this must be manually dropped.
130    pub(crate) fn new() -> Self {
131        // SAFETY: Pointers returned by `Box::into_raw` are guaranteed to be non null
132        unsafe {
133            Self {
134                bytes: NonNull::new_unchecked(Box::into_raw(Box::default())),
135                cursor: NonNull::new_unchecked(Box::into_raw(Box::new(0usize))),
136                panic_recovery: NonNull::new_unchecked(Box::into_raw(Box::default())),
137            }
138        }
139    }
140
141    /// Returns true if the queue is empty.
142    ///
143    /// # Safety
144    ///
145    /// * Caller ensures that `bytes` and `cursor` point to valid memory
146    pub unsafe fn is_empty(&self) -> bool {
147        // SAFETY: Pointers are guaranteed to be valid by requirements on `.clone_unsafe`
148        (unsafe { *self.cursor.as_ref() }) >= (unsafe { self.bytes.as_ref() }).len()
149    }
150
151    /// Push a [`Command`] onto the queue.
152    ///
153    /// # Safety
154    ///
155    /// * Caller ensures that `self` has not outlived the underlying queue
156    #[inline]
157    pub unsafe fn push<C>(&mut self, command: C)
158    where
159        C: Command,
160    {
161        // Stores a command alongside its metadata.
162        // `repr(C)` prevents the compiler from reordering the fields,
163        // while `repr(packed)` prevents the compiler from inserting padding bytes.
164        #[repr(C, packed)]
165        struct Packed<T: Command> {
166            meta: CommandMeta,
167            command: T,
168        }
169
170        let meta = CommandMeta {
171            consume_command_and_get_size: |command, world, cursor| {
172                *cursor += std::mem::size_of::<C>();
173
174                // SAFETY: According to the invariants of `CommandMeta.consume_command_and_get_size`,
175                // `command` must point to a value of type `C`.
176                let command: C = unsafe { command.read_unaligned() };
177                match world {
178                    // Apply command to the provided world...
179                    Some(mut world) => {
180                        // SAFETY: Caller ensures pointer is not null
181                        let world = unsafe { world.as_mut() };
182                        command.apply(world);
183                        // The command may have queued up world commands, which we flush here to ensure they are also picked up.
184                        // If the current command queue already the World Command queue, this will still behave appropriately because the global cursor
185                        // is still at the current `stop`, ensuring only the newly queued Commands will be applied.
186                        world.flush();
187                    }
188                    // ...or discard it.
189                    None => drop(command),
190                }
191            },
192        };
193
194        // SAFETY: There are no outstanding references to self.bytes
195        let bytes = unsafe { self.bytes.as_mut() };
196
197        let old_len = bytes.len();
198
199        // Reserve enough bytes for both the metadata and the command itself.
200        bytes.reserve(std::mem::size_of::<Packed<C>>());
201
202        // Pointer to the bytes at the end of the buffer.
203        // SAFETY: We know it is within bounds of the allocation, due to the call to `.reserve()`.
204        let ptr = unsafe { bytes.as_mut_ptr().add(old_len) };
205
206        // Write the metadata into the buffer, followed by the command.
207        // We are using a packed struct to write them both as one operation.
208        // SAFETY: `ptr` must be non-null, since it is within a non-null buffer.
209        // The call to `reserve()` ensures that the buffer has enough space to fit a value of type `C`,
210        // and it is valid to write any bit pattern since the underlying buffer is of type `MaybeUninit<u8>`.
211        unsafe {
212            ptr.cast::<Packed<C>>()
213                .write_unaligned(Packed { meta, command });
214        }
215
216        // Extend the length of the buffer to include the data we just wrote.
217        // SAFETY: The new length is guaranteed to fit in the vector's capacity,
218        // due to the call to `.reserve()` above.
219        unsafe {
220            bytes.set_len(old_len + std::mem::size_of::<Packed<C>>());
221        }
222    }
223
224    /// If `world` is [`Some`], this will apply the queued [commands](`Command`).
225    /// If `world` is [`None`], this will drop the queued [commands](`Command`) (without applying them).
226    /// This clears the queue.
227    ///
228    /// # Safety
229    ///
230    /// * Caller ensures that `self` has not outlived the underlying queue
231    #[inline]
232    pub(crate) unsafe fn apply_or_drop_queued(&mut self, world: Option<NonNull<World>>) {
233        // SAFETY: If this is the command queue on world, world will not be dropped as we have a mutable reference
234        // If this is not the command queue on world we have exclusive ownership and self will not be mutated
235        let start = *self.cursor.as_ref();
236        let stop = self.bytes.as_ref().len();
237        let mut local_cursor = start;
238        // SAFETY: we are setting the global cursor to the current length to prevent the executing commands from applying
239        // the remaining commands currently in this list. This is safe.
240        *self.cursor.as_mut() = stop;
241
242        while local_cursor < stop {
243            // SAFETY: The cursor is either at the start of the buffer, or just after the previous command.
244            // Since we know that the cursor is in bounds, it must point to the start of a new command.
245            let meta = unsafe {
246                self.bytes
247                    .as_mut()
248                    .as_mut_ptr()
249                    .add(local_cursor)
250                    .cast::<CommandMeta>()
251                    .read_unaligned()
252            };
253
254            // Advance to the bytes just after `meta`, which represent a type-erased command.
255            local_cursor += std::mem::size_of::<CommandMeta>();
256            // Construct an owned pointer to the command.
257            // SAFETY: It is safe to transfer ownership out of `self.bytes`, since the increment of `cursor` above
258            // guarantees that nothing stored in the buffer will get observed after this function ends.
259            // `cmd` points to a valid address of a stored command, so it must be non-null.
260            let cmd = unsafe {
261                OwningPtr::<Unaligned>::new(std::ptr::NonNull::new_unchecked(
262                    self.bytes.as_mut().as_mut_ptr().add(local_cursor).cast(),
263                ))
264            };
265            let result = panic::catch_unwind(AssertUnwindSafe(|| {
266                // SAFETY: The data underneath the cursor must correspond to the type erased in metadata,
267                // since they were stored next to each other by `.push()`.
268                // For ZSTs, the type doesn't matter as long as the pointer is non-null.
269                // This also advances the cursor past the command. For ZSTs, the cursor will not move.
270                // At this point, it will either point to the next `CommandMeta`,
271                // or the cursor will be out of bounds and the loop will end.
272                unsafe { (meta.consume_command_and_get_size)(cmd, world, &mut local_cursor) };
273            }));
274
275            if let Err(payload) = result {
276                // local_cursor now points to the location _after_ the panicked command.
277                // Add the remaining commands that _would have_ been applied to the
278                // panic_recovery queue.
279                //
280                // This uses `current_stop` instead of `stop` to account for any commands
281                // that were queued _during_ this panic.
282                //
283                // This is implemented in such a way that if apply_or_drop_queued() are nested recursively in,
284                // an applied Command, the correct command order will be retained.
285                let panic_recovery = self.panic_recovery.as_mut();
286                let bytes = self.bytes.as_mut();
287                let current_stop = bytes.len();
288                panic_recovery.extend_from_slice(&bytes[local_cursor..current_stop]);
289                bytes.set_len(start);
290                *self.cursor.as_mut() = start;
291
292                // This was the "top of the apply stack". If we are _not_ at the top of the apply stack,
293                // when we call`resume_unwind" the caller "closer to the top" will catch the unwind and do this check,
294                // until we reach the top.
295                if start == 0 {
296                    bytes.append(panic_recovery);
297                }
298                panic::resume_unwind(payload);
299            }
300        }
301
302        // Reset the buffer: all commands past the original `start` cursor have been applied.
303        // SAFETY: we are setting the length of bytes to the original length, minus the length of the original
304        // list of commands being considered. All bytes remaining in the Vec are still valid, unapplied commands.
305        unsafe {
306            self.bytes.as_mut().set_len(start);
307            *self.cursor.as_mut() = start;
308        };
309    }
310}
311
312impl Drop for CommandQueue {
313    fn drop(&mut self) {
314        if !self.bytes.is_empty() {
315            warn!("CommandQueue has un-applied commands being dropped.");
316        }
317        // SAFETY: A reference is always a valid pointer
318        unsafe { self.get_raw().apply_or_drop_queued(None) };
319    }
320}
321
322impl SystemBuffer for CommandQueue {
323    #[inline]
324    fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
325        #[cfg(feature = "trace")]
326        let _span_guard = _system_meta.commands_span.enter();
327        self.apply(world);
328    }
329
330    #[inline]
331    fn queue(&mut self, _system_meta: &SystemMeta, mut world: DeferredWorld) {
332        world.commands().append(self);
333    }
334}
335
336#[cfg(test)]
337mod test {
338    use super::*;
339    use crate as bevy_ecs;
340    use crate::system::Resource;
341    use std::{
342        panic::AssertUnwindSafe,
343        sync::{
344            atomic::{AtomicU32, Ordering},
345            Arc,
346        },
347    };
348
349    struct DropCheck(Arc<AtomicU32>);
350
351    impl DropCheck {
352        fn new() -> (Self, Arc<AtomicU32>) {
353            let drops = Arc::new(AtomicU32::new(0));
354            (Self(drops.clone()), drops)
355        }
356    }
357
358    impl Drop for DropCheck {
359        fn drop(&mut self) {
360            self.0.fetch_add(1, Ordering::Relaxed);
361        }
362    }
363
364    impl Command for DropCheck {
365        fn apply(self, _: &mut World) {}
366    }
367
368    #[test]
369    fn test_command_queue_inner_drop() {
370        let mut queue = CommandQueue::default();
371
372        let (dropcheck_a, drops_a) = DropCheck::new();
373        let (dropcheck_b, drops_b) = DropCheck::new();
374
375        queue.push(dropcheck_a);
376        queue.push(dropcheck_b);
377
378        assert_eq!(drops_a.load(Ordering::Relaxed), 0);
379        assert_eq!(drops_b.load(Ordering::Relaxed), 0);
380
381        let mut world = World::new();
382        queue.apply(&mut world);
383
384        assert_eq!(drops_a.load(Ordering::Relaxed), 1);
385        assert_eq!(drops_b.load(Ordering::Relaxed), 1);
386    }
387
388    /// Asserts that inner [commands](`Command`) are dropped on early drop of [`CommandQueue`].
389    /// Originally identified as an issue in [#10676](https://github.com/bevyengine/bevy/issues/10676)
390    #[test]
391    fn test_command_queue_inner_drop_early() {
392        let mut queue = CommandQueue::default();
393
394        let (dropcheck_a, drops_a) = DropCheck::new();
395        let (dropcheck_b, drops_b) = DropCheck::new();
396
397        queue.push(dropcheck_a);
398        queue.push(dropcheck_b);
399
400        assert_eq!(drops_a.load(Ordering::Relaxed), 0);
401        assert_eq!(drops_b.load(Ordering::Relaxed), 0);
402
403        drop(queue);
404
405        assert_eq!(drops_a.load(Ordering::Relaxed), 1);
406        assert_eq!(drops_b.load(Ordering::Relaxed), 1);
407    }
408
409    struct SpawnCommand;
410
411    impl Command for SpawnCommand {
412        fn apply(self, world: &mut World) {
413            world.spawn_empty();
414        }
415    }
416
417    #[test]
418    fn test_command_queue_inner() {
419        let mut queue = CommandQueue::default();
420
421        queue.push(SpawnCommand);
422        queue.push(SpawnCommand);
423
424        let mut world = World::new();
425        queue.apply(&mut world);
426
427        assert_eq!(world.entities().len(), 2);
428
429        // The previous call to `apply` cleared the queue.
430        // This call should do nothing.
431        queue.apply(&mut world);
432        assert_eq!(world.entities().len(), 2);
433    }
434
435    // This has an arbitrary value `String` stored to ensure
436    // when then command gets pushed, the `bytes` vector gets
437    // some data added to it.
438    #[allow(dead_code)]
439    struct PanicCommand(String);
440    impl Command for PanicCommand {
441        fn apply(self, _: &mut World) {
442            panic!("command is panicking");
443        }
444    }
445
446    #[test]
447    fn test_command_queue_inner_panic_safe() {
448        std::panic::set_hook(Box::new(|_| {}));
449
450        let mut queue = CommandQueue::default();
451
452        queue.push(PanicCommand("I panic!".to_owned()));
453        queue.push(SpawnCommand);
454
455        let mut world = World::new();
456
457        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
458            queue.apply(&mut world);
459        }));
460
461        // Even though the first command panicked, it's still ok to push
462        // more commands.
463        queue.push(SpawnCommand);
464        queue.push(SpawnCommand);
465        queue.apply(&mut world);
466        assert_eq!(world.entities().len(), 3);
467    }
468
469    #[test]
470    fn test_command_queue_inner_nested_panic_safe() {
471        std::panic::set_hook(Box::new(|_| {}));
472
473        #[derive(Resource, Default)]
474        struct Order(Vec<usize>);
475
476        let mut world = World::new();
477        world.init_resource::<Order>();
478
479        fn add_index(index: usize) -> impl Command {
480            move |world: &mut World| world.resource_mut::<Order>().0.push(index)
481        }
482        world.commands().add(add_index(1));
483        world.commands().add(|world: &mut World| {
484            world.commands().add(add_index(2));
485            world.commands().add(PanicCommand("I panic!".to_owned()));
486            world.commands().add(add_index(3));
487            world.flush_commands();
488        });
489        world.commands().add(add_index(4));
490
491        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
492            world.flush_commands();
493        }));
494
495        world.commands().add(add_index(5));
496        world.flush_commands();
497        assert_eq!(&world.resource::<Order>().0, &[1, 2, 3, 4, 5]);
498    }
499
500    // NOTE: `CommandQueue` is `Send` because `Command` is send.
501    // If the `Command` trait gets reworked to be non-send, `CommandQueue`
502    // should be reworked.
503    // This test asserts that Command types are send.
504    fn assert_is_send_impl(_: impl Send) {}
505    fn assert_is_send(command: impl Command) {
506        assert_is_send_impl(command);
507    }
508
509    #[test]
510    fn test_command_is_send() {
511        assert_is_send(SpawnCommand);
512    }
513
514    #[allow(dead_code)]
515    struct CommandWithPadding(u8, u16);
516    impl Command for CommandWithPadding {
517        fn apply(self, _: &mut World) {}
518    }
519
520    #[cfg(miri)]
521    #[test]
522    fn test_uninit_bytes() {
523        let mut queue = CommandQueue::default();
524        queue.push(CommandWithPadding(0, 0));
525        let _ = format!("{:?}", queue.bytes);
526    }
527}