bevy_ecs/schedule/executor/
multi_threaded.rs

1use std::{
2    any::Any,
3    sync::{Arc, Mutex, MutexGuard},
4};
5
6use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
7use bevy_utils::default;
8use bevy_utils::syncunsafecell::SyncUnsafeCell;
9#[cfg(feature = "trace")]
10use bevy_utils::tracing::{info_span, Span};
11use std::panic::AssertUnwindSafe;
12
13use concurrent_queue::ConcurrentQueue;
14use fixedbitset::FixedBitSet;
15
16use crate::{
17    archetype::ArchetypeComponentId,
18    prelude::Resource,
19    query::Access,
20    schedule::{is_apply_deferred, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule},
21    system::BoxedSystem,
22    world::{unsafe_world_cell::UnsafeWorldCell, World},
23};
24
25use crate as bevy_ecs;
26
27use super::__rust_begin_short_backtrace;
28
29/// Borrowed data used by the [`MultiThreadedExecutor`].
30struct Environment<'env, 'sys> {
31    executor: &'env MultiThreadedExecutor,
32    systems: &'sys [SyncUnsafeCell<BoxedSystem>],
33    conditions: SyncUnsafeCell<Conditions<'sys>>,
34    world_cell: UnsafeWorldCell<'env>,
35}
36
37struct Conditions<'a> {
38    system_conditions: &'a mut [Vec<BoxedCondition>],
39    set_conditions: &'a mut [Vec<BoxedCondition>],
40    sets_with_conditions_of_systems: &'a [FixedBitSet],
41    systems_in_sets_with_conditions: &'a [FixedBitSet],
42}
43
44impl<'env, 'sys> Environment<'env, 'sys> {
45    fn new(
46        executor: &'env MultiThreadedExecutor,
47        schedule: &'sys mut SystemSchedule,
48        world: &'env mut World,
49    ) -> Self {
50        Environment {
51            executor,
52            systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
53            conditions: SyncUnsafeCell::new(Conditions {
54                system_conditions: &mut schedule.system_conditions,
55                set_conditions: &mut schedule.set_conditions,
56                sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
57                systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,
58            }),
59            world_cell: world.as_unsafe_world_cell(),
60        }
61    }
62}
63
64/// Per-system data used by the [`MultiThreadedExecutor`].
65// Copied here because it can't be read from the system when it's running.
66struct SystemTaskMetadata {
67    /// The [`ArchetypeComponentId`] access of the system.
68    archetype_component_access: Access<ArchetypeComponentId>,
69    /// Indices of the systems that directly depend on the system.
70    dependents: Vec<usize>,
71    /// Is `true` if the system does not access `!Send` data.
72    is_send: bool,
73    /// Is `true` if the system is exclusive.
74    is_exclusive: bool,
75}
76
77/// The result of running a system that is sent across a channel.
78struct SystemResult {
79    system_index: usize,
80}
81
82/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
83pub struct MultiThreadedExecutor {
84    /// The running state, protected by a mutex so that a reference to the executor can be shared across tasks.
85    state: Mutex<ExecutorState>,
86    /// Queue of system completion events.
87    system_completion: ConcurrentQueue<SystemResult>,
88    /// Setting when true applies deferred system buffers after all systems have run
89    apply_final_deferred: bool,
90    /// When set, tells the executor that a thread has panicked.
91    panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
92    starting_systems: FixedBitSet,
93    /// Cached tracing span
94    #[cfg(feature = "trace")]
95    executor_span: Span,
96}
97
98/// The state of the executor while running.
99pub struct ExecutorState {
100    /// Metadata for scheduling and running system tasks.
101    system_task_metadata: Vec<SystemTaskMetadata>,
102    /// Union of the accesses of all currently running systems.
103    active_access: Access<ArchetypeComponentId>,
104    /// Returns `true` if a system with non-`Send` access is running.
105    local_thread_running: bool,
106    /// Returns `true` if an exclusive system is running.
107    exclusive_running: bool,
108    /// The number of systems that are running.
109    num_running_systems: usize,
110    /// The number of dependencies each system has that have not completed.
111    num_dependencies_remaining: Vec<usize>,
112    /// System sets whose conditions have been evaluated.
113    evaluated_sets: FixedBitSet,
114    /// Systems that have no remaining dependencies and are waiting to run.
115    ready_systems: FixedBitSet,
116    /// copy of `ready_systems`
117    ready_systems_copy: FixedBitSet,
118    /// Systems that are running.
119    running_systems: FixedBitSet,
120    /// Systems that got skipped.
121    skipped_systems: FixedBitSet,
122    /// Systems whose conditions have been evaluated and were run or skipped.
123    completed_systems: FixedBitSet,
124    /// Systems that have run but have not had their buffers applied.
125    unapplied_systems: FixedBitSet,
126}
127
128/// References to data required by the executor.
129/// This is copied to each system task so that can invoke the executor when they complete.
130// These all need to outlive 'scope in order to be sent to new tasks,
131// and keeping them all in a struct means we can use lifetime elision.
132#[derive(Copy, Clone)]
133struct Context<'scope, 'env, 'sys> {
134    environment: &'env Environment<'env, 'sys>,
135    scope: &'scope Scope<'scope, 'env, ()>,
136}
137
138impl Default for MultiThreadedExecutor {
139    fn default() -> Self {
140        Self::new()
141    }
142}
143
144impl SystemExecutor for MultiThreadedExecutor {
145    fn kind(&self) -> ExecutorKind {
146        ExecutorKind::MultiThreaded
147    }
148
149    fn init(&mut self, schedule: &SystemSchedule) {
150        let state = self.state.get_mut().unwrap();
151        // pre-allocate space
152        let sys_count = schedule.system_ids.len();
153        let set_count = schedule.set_ids.len();
154
155        self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
156        self.starting_systems = FixedBitSet::with_capacity(sys_count);
157        state.evaluated_sets = FixedBitSet::with_capacity(set_count);
158        state.ready_systems = FixedBitSet::with_capacity(sys_count);
159        state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
160        state.running_systems = FixedBitSet::with_capacity(sys_count);
161        state.completed_systems = FixedBitSet::with_capacity(sys_count);
162        state.skipped_systems = FixedBitSet::with_capacity(sys_count);
163        state.unapplied_systems = FixedBitSet::with_capacity(sys_count);
164
165        state.system_task_metadata = Vec::with_capacity(sys_count);
166        for index in 0..sys_count {
167            state.system_task_metadata.push(SystemTaskMetadata {
168                archetype_component_access: default(),
169                dependents: schedule.system_dependents[index].clone(),
170                is_send: schedule.systems[index].is_send(),
171                is_exclusive: schedule.systems[index].is_exclusive(),
172            });
173            if schedule.system_dependencies[index] == 0 {
174                self.starting_systems.insert(index);
175            }
176        }
177
178        state.num_dependencies_remaining = Vec::with_capacity(sys_count);
179    }
180
181    fn run(
182        &mut self,
183        schedule: &mut SystemSchedule,
184        world: &mut World,
185        _skip_systems: Option<&FixedBitSet>,
186    ) {
187        let state = self.state.get_mut().unwrap();
188        // reset counts
189        if schedule.systems.is_empty() {
190            return;
191        }
192        state.num_running_systems = 0;
193        state
194            .num_dependencies_remaining
195            .clone_from(&schedule.system_dependencies);
196        state.ready_systems.clone_from(&self.starting_systems);
197
198        // If stepping is enabled, make sure we skip those systems that should
199        // not be run.
200        #[cfg(feature = "bevy_debug_stepping")]
201        if let Some(skipped_systems) = _skip_systems {
202            debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
203            // mark skipped systems as completed
204            state.completed_systems |= skipped_systems;
205
206            // signal the dependencies for each of the skipped systems, as
207            // though they had run
208            for system_index in skipped_systems.ones() {
209                state.signal_dependents(system_index);
210                state.ready_systems.remove(system_index);
211            }
212        }
213
214        let thread_executor = world
215            .get_resource::<MainThreadExecutor>()
216            .map(|e| e.0.clone());
217        let thread_executor = thread_executor.as_deref();
218
219        let environment = &Environment::new(self, schedule, world);
220
221        ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(
222            false,
223            thread_executor,
224            |scope| {
225                let context = Context { environment, scope };
226
227                // The first tick won't need to process finished systems, but we still need to run the loop in
228                // tick_executor() in case a system completes while the first tick still holds the mutex.
229                context.tick_executor();
230            },
231        );
232
233        // End the borrows of self and world in environment by copying out the reference to systems.
234        let systems = environment.systems;
235
236        let state = self.state.get_mut().unwrap();
237        if self.apply_final_deferred {
238            // Do one final apply buffers after all systems have completed
239            // Commands should be applied while on the scope's thread, not the executor's thread
240            let res = apply_deferred(&state.unapplied_systems, systems, world);
241            if let Err(payload) = res {
242                let panic_payload = self.panic_payload.get_mut().unwrap();
243                *panic_payload = Some(payload);
244            }
245            state.unapplied_systems.clear();
246        }
247
248        // check to see if there was a panic
249        let payload = self.panic_payload.get_mut().unwrap();
250        if let Some(payload) = payload.take() {
251            std::panic::resume_unwind(payload);
252        }
253
254        debug_assert!(state.ready_systems.is_clear());
255        debug_assert!(state.running_systems.is_clear());
256        state.active_access.clear();
257        state.evaluated_sets.clear();
258        state.skipped_systems.clear();
259        state.completed_systems.clear();
260    }
261
262    fn set_apply_final_deferred(&mut self, value: bool) {
263        self.apply_final_deferred = value;
264    }
265}
266
267impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
268    fn system_completed(
269        &self,
270        system_index: usize,
271        res: Result<(), Box<dyn Any + Send>>,
272        system: &BoxedSystem,
273    ) {
274        // tell the executor that the system finished
275        self.environment
276            .executor
277            .system_completion
278            .push(SystemResult { system_index })
279            .unwrap_or_else(|error| unreachable!("{}", error));
280        if let Err(payload) = res {
281            eprintln!("Encountered a panic in system `{}`!", &*system.name());
282            // set the payload to propagate the error
283            {
284                let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();
285                *panic_payload = Some(payload);
286            }
287        }
288        self.tick_executor();
289    }
290
291    fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {
292        let guard = self.environment.executor.state.try_lock().ok()?;
293        // SAFETY: This is an exclusive access as no other location fetches conditions mutably, and
294        // is synchronized by the lock on the executor state.
295        let conditions = unsafe { &mut *self.environment.conditions.get() };
296        Some((conditions, guard))
297    }
298
299    fn tick_executor(&self) {
300        // Ensure that the executor handles any events pushed to the system_completion queue by this thread.
301        // If this thread acquires the lock, the exector runs after the push() and they are processed.
302        // If this thread does not acquire the lock, then the is_empty() check on the other thread runs
303        // after the lock is released, which is after try_lock() failed, which is after the push()
304        // on this thread, so the is_empty() check will see the new events and loop.
305        loop {
306            let Some((conditions, mut guard)) = self.try_lock() else {
307                return;
308            };
309            guard.tick(self, conditions);
310            // Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events.
311            drop(guard);
312            if self.environment.executor.system_completion.is_empty() {
313                return;
314            }
315        }
316    }
317}
318
319impl MultiThreadedExecutor {
320    /// Creates a new `multi_threaded` executor for use with a [`Schedule`].
321    ///
322    /// [`Schedule`]: crate::schedule::Schedule
323    pub fn new() -> Self {
324        Self {
325            state: Mutex::new(ExecutorState::new()),
326            system_completion: ConcurrentQueue::unbounded(),
327            starting_systems: FixedBitSet::new(),
328            apply_final_deferred: true,
329            panic_payload: Mutex::new(None),
330            #[cfg(feature = "trace")]
331            executor_span: info_span!("multithreaded executor"),
332        }
333    }
334}
335
336impl ExecutorState {
337    fn new() -> Self {
338        Self {
339            system_task_metadata: Vec::new(),
340            num_running_systems: 0,
341            num_dependencies_remaining: Vec::new(),
342            active_access: default(),
343            local_thread_running: false,
344            exclusive_running: false,
345            evaluated_sets: FixedBitSet::new(),
346            ready_systems: FixedBitSet::new(),
347            ready_systems_copy: FixedBitSet::new(),
348            running_systems: FixedBitSet::new(),
349            skipped_systems: FixedBitSet::new(),
350            completed_systems: FixedBitSet::new(),
351            unapplied_systems: FixedBitSet::new(),
352        }
353    }
354
355    fn tick(&mut self, context: &Context, conditions: &mut Conditions) {
356        #[cfg(feature = "trace")]
357        let _span = context.environment.executor.executor_span.enter();
358
359        for result in context.environment.executor.system_completion.try_iter() {
360            self.finish_system_and_handle_dependents(result);
361        }
362
363        self.rebuild_active_access();
364
365        // SAFETY:
366        // - `finish_system_and_handle_dependents` has updated the currently running systems.
367        // - `rebuild_active_access` locks access for all currently running systems.
368        unsafe {
369            self.spawn_system_tasks(context, conditions);
370        }
371    }
372
373    /// # Safety
374    /// - Caller must ensure that `self.ready_systems` does not contain any systems that
375    ///   have been mutably borrowed (such as the systems currently running).
376    /// - `world_cell` must have permission to access all world data (not counting
377    ///   any world data that is claimed by systems currently running on this executor).
378    unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {
379        if self.exclusive_running {
380            return;
381        }
382
383        // can't borrow since loop mutably borrows `self`
384        let mut ready_systems = std::mem::take(&mut self.ready_systems_copy);
385
386        // Skipping systems may cause their dependents to become ready immediately.
387        // If that happens, we need to run again immediately or we may fail to spawn those dependents.
388        let mut check_for_new_ready_systems = true;
389        while check_for_new_ready_systems {
390            check_for_new_ready_systems = false;
391
392            ready_systems.clone_from(&self.ready_systems);
393
394            for system_index in ready_systems.ones() {
395                debug_assert!(!self.running_systems.contains(system_index));
396                // SAFETY: Caller assured that these systems are not running.
397                // Therefore, no other reference to this system exists and there is no aliasing.
398                let system = unsafe { &mut *context.environment.systems[system_index].get() };
399
400                if !self.can_run(
401                    system_index,
402                    system,
403                    conditions,
404                    context.environment.world_cell,
405                ) {
406                    // NOTE: exclusive systems with ambiguities are susceptible to
407                    // being significantly displaced here (compared to single-threaded order)
408                    // if systems after them in topological order can run
409                    // if that becomes an issue, `break;` if exclusive system
410                    continue;
411                }
412
413                self.ready_systems.remove(system_index);
414
415                // SAFETY: `can_run` returned true, which means that:
416                // - It must have called `update_archetype_component_access` for each run condition.
417                // - There can be no systems running whose accesses would conflict with any conditions.
418                if unsafe {
419                    !self.should_run(
420                        system_index,
421                        system,
422                        conditions,
423                        context.environment.world_cell,
424                    )
425                } {
426                    self.skip_system_and_signal_dependents(system_index);
427                    // signal_dependents may have set more systems to ready.
428                    check_for_new_ready_systems = true;
429                    continue;
430                }
431
432                self.running_systems.insert(system_index);
433                self.num_running_systems += 1;
434
435                if self.system_task_metadata[system_index].is_exclusive {
436                    // SAFETY: `can_run` returned true for this system,
437                    // which means no systems are currently borrowed.
438                    unsafe {
439                        self.spawn_exclusive_system_task(context, system_index);
440                    }
441                    check_for_new_ready_systems = false;
442                    break;
443                }
444
445                // SAFETY:
446                // - Caller ensured no other reference to this system exists.
447                // - `can_run` has been called, which calls `update_archetype_component_access` with this system.
448                // - `can_run` returned true, so no systems with conflicting world access are running.
449                unsafe {
450                    self.spawn_system_task(context, system_index);
451                }
452            }
453        }
454
455        // give back
456        self.ready_systems_copy = ready_systems;
457    }
458
459    fn can_run(
460        &mut self,
461        system_index: usize,
462        system: &mut BoxedSystem,
463        conditions: &mut Conditions,
464        world: UnsafeWorldCell,
465    ) -> bool {
466        let system_meta = &self.system_task_metadata[system_index];
467        if system_meta.is_exclusive && self.num_running_systems > 0 {
468            return false;
469        }
470
471        if !system_meta.is_send && self.local_thread_running {
472            return false;
473        }
474
475        // TODO: an earlier out if world's archetypes did not change
476        for set_idx in conditions.sets_with_conditions_of_systems[system_index]
477            .difference(&self.evaluated_sets)
478        {
479            for condition in &mut conditions.set_conditions[set_idx] {
480                condition.update_archetype_component_access(world);
481                if !condition
482                    .archetype_component_access()
483                    .is_compatible(&self.active_access)
484                {
485                    return false;
486                }
487            }
488        }
489
490        for condition in &mut conditions.system_conditions[system_index] {
491            condition.update_archetype_component_access(world);
492            if !condition
493                .archetype_component_access()
494                .is_compatible(&self.active_access)
495            {
496                return false;
497            }
498        }
499
500        if !self.skipped_systems.contains(system_index) {
501            system.update_archetype_component_access(world);
502            if !system
503                .archetype_component_access()
504                .is_compatible(&self.active_access)
505            {
506                return false;
507            }
508
509            self.system_task_metadata[system_index]
510                .archetype_component_access
511                .clone_from(system.archetype_component_access());
512        }
513
514        true
515    }
516
517    /// # Safety
518    /// * `world` must have permission to read any world data required by
519    ///   the system's conditions: this includes conditions for the system
520    ///   itself, and conditions for any of the system's sets.
521    /// * `update_archetype_component` must have been called with `world`
522    ///   for each run condition in `conditions`.
523    unsafe fn should_run(
524        &mut self,
525        system_index: usize,
526        _system: &BoxedSystem,
527        conditions: &mut Conditions,
528        world: UnsafeWorldCell,
529    ) -> bool {
530        let mut should_run = !self.skipped_systems.contains(system_index);
531        for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {
532            if self.evaluated_sets.contains(set_idx) {
533                continue;
534            }
535
536            // Evaluate the system set's conditions.
537            // SAFETY:
538            // - The caller ensures that `world` has permission to read any data
539            //   required by the conditions.
540            // - `update_archetype_component_access` has been called for each run condition.
541            let set_conditions_met = unsafe {
542                evaluate_and_fold_conditions(&mut conditions.set_conditions[set_idx], world)
543            };
544
545            if !set_conditions_met {
546                self.skipped_systems
547                    .union_with(&conditions.systems_in_sets_with_conditions[set_idx]);
548            }
549
550            should_run &= set_conditions_met;
551            self.evaluated_sets.insert(set_idx);
552        }
553
554        // Evaluate the system's conditions.
555        // SAFETY:
556        // - The caller ensures that `world` has permission to read any data
557        //   required by the conditions.
558        // - `update_archetype_component_access` has been called for each run condition.
559        let system_conditions_met = unsafe {
560            evaluate_and_fold_conditions(&mut conditions.system_conditions[system_index], world)
561        };
562
563        if !system_conditions_met {
564            self.skipped_systems.insert(system_index);
565        }
566
567        should_run &= system_conditions_met;
568
569        should_run
570    }
571
572    /// # Safety
573    /// - Caller must not alias systems that are running.
574    /// - `world` must have permission to access the world data
575    ///   used by the specified system.
576    /// - `update_archetype_component_access` must have been called with `world`
577    ///   on the system associated with `system_index`.
578    unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {
579        // SAFETY: this system is not running, no other reference exists
580        let system = unsafe { &mut *context.environment.systems[system_index].get() };
581        // Move the full context object into the new future.
582        let context = *context;
583
584        let system_meta = &self.system_task_metadata[system_index];
585
586        let task = async move {
587            let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
588                // SAFETY:
589                // - The caller ensures that we have permission to
590                // access the world data used by the system.
591                // - `update_archetype_component_access` has been called.
592                unsafe {
593                    __rust_begin_short_backtrace::run_unsafe(
594                        &mut **system,
595                        context.environment.world_cell,
596                    );
597                };
598            }));
599            context.system_completed(system_index, res, system);
600        };
601
602        self.active_access
603            .extend(&system_meta.archetype_component_access);
604
605        if system_meta.is_send {
606            context.scope.spawn(task);
607        } else {
608            self.local_thread_running = true;
609            context.scope.spawn_on_external(task);
610        }
611    }
612
613    /// # Safety
614    /// Caller must ensure no systems are currently borrowed.
615    unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {
616        // SAFETY: `can_run` returned true for this system, which means
617        // that no other systems currently have access to the world.
618        let world = unsafe { context.environment.world_cell.world_mut() };
619        // SAFETY: this system is not running, no other reference exists
620        let system = unsafe { &mut *context.environment.systems[system_index].get() };
621        // Move the full context object into the new future.
622        let context = *context;
623
624        if is_apply_deferred(system) {
625            // TODO: avoid allocation
626            let unapplied_systems = self.unapplied_systems.clone();
627            self.unapplied_systems.clear();
628            let task = async move {
629                let res = apply_deferred(&unapplied_systems, context.environment.systems, world);
630                context.system_completed(system_index, res, system);
631            };
632
633            context.scope.spawn_on_scope(task);
634        } else {
635            let task = async move {
636                let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
637                    __rust_begin_short_backtrace::run(&mut **system, world);
638                }));
639                context.system_completed(system_index, res, system);
640            };
641
642            context.scope.spawn_on_scope(task);
643        }
644
645        self.exclusive_running = true;
646        self.local_thread_running = true;
647    }
648
649    fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
650        let SystemResult { system_index, .. } = result;
651
652        if self.system_task_metadata[system_index].is_exclusive {
653            self.exclusive_running = false;
654        }
655
656        if !self.system_task_metadata[system_index].is_send {
657            self.local_thread_running = false;
658        }
659
660        debug_assert!(self.num_running_systems >= 1);
661        self.num_running_systems -= 1;
662        self.running_systems.remove(system_index);
663        self.completed_systems.insert(system_index);
664        self.unapplied_systems.insert(system_index);
665
666        self.signal_dependents(system_index);
667    }
668
669    fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
670        self.completed_systems.insert(system_index);
671        self.signal_dependents(system_index);
672    }
673
674    fn signal_dependents(&mut self, system_index: usize) {
675        for &dep_idx in &self.system_task_metadata[system_index].dependents {
676            let remaining = &mut self.num_dependencies_remaining[dep_idx];
677            debug_assert!(*remaining >= 1);
678            *remaining -= 1;
679            if *remaining == 0 && !self.completed_systems.contains(dep_idx) {
680                self.ready_systems.insert(dep_idx);
681            }
682        }
683    }
684
685    fn rebuild_active_access(&mut self) {
686        self.active_access.clear();
687        for index in self.running_systems.ones() {
688            let system_meta = &self.system_task_metadata[index];
689            self.active_access
690                .extend(&system_meta.archetype_component_access);
691        }
692    }
693}
694
695fn apply_deferred(
696    unapplied_systems: &FixedBitSet,
697    systems: &[SyncUnsafeCell<BoxedSystem>],
698    world: &mut World,
699) -> Result<(), Box<dyn Any + Send>> {
700    for system_index in unapplied_systems.ones() {
701        // SAFETY: none of these systems are running, no other references exist
702        let system = unsafe { &mut *systems[system_index].get() };
703        let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
704            system.apply_deferred(world);
705        }));
706        if let Err(payload) = res {
707            eprintln!(
708                "Encountered a panic when applying buffers for system `{}`!",
709                &*system.name()
710            );
711            return Err(payload);
712        }
713    }
714    Ok(())
715}
716
717/// # Safety
718/// - `world` must have permission to read any world data
719///   required by `conditions`.
720/// - `update_archetype_component_access` must have been called
721///   with `world` for each condition in `conditions`.
722unsafe fn evaluate_and_fold_conditions(
723    conditions: &mut [BoxedCondition],
724    world: UnsafeWorldCell,
725) -> bool {
726    // not short-circuiting is intentional
727    #[allow(clippy::unnecessary_fold)]
728    conditions
729        .iter_mut()
730        .map(|condition| {
731            // SAFETY: The caller ensures that `world` has permission to
732            // access any data required by the condition.
733            unsafe { __rust_begin_short_backtrace::readonly_run_unsafe(&mut **condition, world) }
734        })
735        .fold(true, |acc, res| acc && res)
736}
737
738/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
739#[derive(Resource, Clone)]
740pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
741
742impl Default for MainThreadExecutor {
743    fn default() -> Self {
744        Self::new()
745    }
746}
747
748impl MainThreadExecutor {
749    /// Creates a new executor that can be used to run systems on the main thread.
750    pub fn new() -> Self {
751        MainThreadExecutor(TaskPool::get_thread_executor())
752    }
753}
754
755#[cfg(test)]
756mod tests {
757    use crate::{
758        self as bevy_ecs,
759        prelude::Resource,
760        schedule::{ExecutorKind, IntoSystemConfigs, Schedule},
761        system::Commands,
762        world::World,
763    };
764
765    #[derive(Resource)]
766    struct R;
767
768    #[test]
769    fn skipped_systems_notify_dependents() {
770        let mut world = World::new();
771        let mut schedule = Schedule::default();
772        schedule.set_executor_kind(ExecutorKind::MultiThreaded);
773        schedule.add_systems(
774            (
775                (|| {}).run_if(|| false),
776                // This system depends on a system that is always skipped.
777                |mut commands: Commands| {
778                    commands.insert_resource(R);
779                },
780            )
781                .chain(),
782        );
783        schedule.run(&mut world);
784        assert!(world.get_resource::<R>().is_some());
785    }
786}