1use std::{
2 any::Any,
3 sync::{Arc, Mutex, MutexGuard},
4};
5
6use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
7use bevy_utils::default;
8use bevy_utils::syncunsafecell::SyncUnsafeCell;
9#[cfg(feature = "trace")]
10use bevy_utils::tracing::{info_span, Span};
11use std::panic::AssertUnwindSafe;
12
13use concurrent_queue::ConcurrentQueue;
14use fixedbitset::FixedBitSet;
15
16use crate::{
17 archetype::ArchetypeComponentId,
18 prelude::Resource,
19 query::Access,
20 schedule::{is_apply_deferred, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule},
21 system::BoxedSystem,
22 world::{unsafe_world_cell::UnsafeWorldCell, World},
23};
24
25use crate as bevy_ecs;
26
27use super::__rust_begin_short_backtrace;
28
29struct Environment<'env, 'sys> {
31 executor: &'env MultiThreadedExecutor,
32 systems: &'sys [SyncUnsafeCell<BoxedSystem>],
33 conditions: SyncUnsafeCell<Conditions<'sys>>,
34 world_cell: UnsafeWorldCell<'env>,
35}
36
37struct Conditions<'a> {
38 system_conditions: &'a mut [Vec<BoxedCondition>],
39 set_conditions: &'a mut [Vec<BoxedCondition>],
40 sets_with_conditions_of_systems: &'a [FixedBitSet],
41 systems_in_sets_with_conditions: &'a [FixedBitSet],
42}
43
44impl<'env, 'sys> Environment<'env, 'sys> {
45 fn new(
46 executor: &'env MultiThreadedExecutor,
47 schedule: &'sys mut SystemSchedule,
48 world: &'env mut World,
49 ) -> Self {
50 Environment {
51 executor,
52 systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
53 conditions: SyncUnsafeCell::new(Conditions {
54 system_conditions: &mut schedule.system_conditions,
55 set_conditions: &mut schedule.set_conditions,
56 sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
57 systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,
58 }),
59 world_cell: world.as_unsafe_world_cell(),
60 }
61 }
62}
63
64struct SystemTaskMetadata {
67 archetype_component_access: Access<ArchetypeComponentId>,
69 dependents: Vec<usize>,
71 is_send: bool,
73 is_exclusive: bool,
75}
76
77struct SystemResult {
79 system_index: usize,
80}
81
82pub struct MultiThreadedExecutor {
84 state: Mutex<ExecutorState>,
86 system_completion: ConcurrentQueue<SystemResult>,
88 apply_final_deferred: bool,
90 panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
92 starting_systems: FixedBitSet,
93 #[cfg(feature = "trace")]
95 executor_span: Span,
96}
97
98pub struct ExecutorState {
100 system_task_metadata: Vec<SystemTaskMetadata>,
102 active_access: Access<ArchetypeComponentId>,
104 local_thread_running: bool,
106 exclusive_running: bool,
108 num_running_systems: usize,
110 num_dependencies_remaining: Vec<usize>,
112 evaluated_sets: FixedBitSet,
114 ready_systems: FixedBitSet,
116 ready_systems_copy: FixedBitSet,
118 running_systems: FixedBitSet,
120 skipped_systems: FixedBitSet,
122 completed_systems: FixedBitSet,
124 unapplied_systems: FixedBitSet,
126}
127
128#[derive(Copy, Clone)]
133struct Context<'scope, 'env, 'sys> {
134 environment: &'env Environment<'env, 'sys>,
135 scope: &'scope Scope<'scope, 'env, ()>,
136}
137
138impl Default for MultiThreadedExecutor {
139 fn default() -> Self {
140 Self::new()
141 }
142}
143
144impl SystemExecutor for MultiThreadedExecutor {
145 fn kind(&self) -> ExecutorKind {
146 ExecutorKind::MultiThreaded
147 }
148
149 fn init(&mut self, schedule: &SystemSchedule) {
150 let state = self.state.get_mut().unwrap();
151 let sys_count = schedule.system_ids.len();
153 let set_count = schedule.set_ids.len();
154
155 self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
156 self.starting_systems = FixedBitSet::with_capacity(sys_count);
157 state.evaluated_sets = FixedBitSet::with_capacity(set_count);
158 state.ready_systems = FixedBitSet::with_capacity(sys_count);
159 state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
160 state.running_systems = FixedBitSet::with_capacity(sys_count);
161 state.completed_systems = FixedBitSet::with_capacity(sys_count);
162 state.skipped_systems = FixedBitSet::with_capacity(sys_count);
163 state.unapplied_systems = FixedBitSet::with_capacity(sys_count);
164
165 state.system_task_metadata = Vec::with_capacity(sys_count);
166 for index in 0..sys_count {
167 state.system_task_metadata.push(SystemTaskMetadata {
168 archetype_component_access: default(),
169 dependents: schedule.system_dependents[index].clone(),
170 is_send: schedule.systems[index].is_send(),
171 is_exclusive: schedule.systems[index].is_exclusive(),
172 });
173 if schedule.system_dependencies[index] == 0 {
174 self.starting_systems.insert(index);
175 }
176 }
177
178 state.num_dependencies_remaining = Vec::with_capacity(sys_count);
179 }
180
181 fn run(
182 &mut self,
183 schedule: &mut SystemSchedule,
184 world: &mut World,
185 _skip_systems: Option<&FixedBitSet>,
186 ) {
187 let state = self.state.get_mut().unwrap();
188 if schedule.systems.is_empty() {
190 return;
191 }
192 state.num_running_systems = 0;
193 state
194 .num_dependencies_remaining
195 .clone_from(&schedule.system_dependencies);
196 state.ready_systems.clone_from(&self.starting_systems);
197
198 #[cfg(feature = "bevy_debug_stepping")]
201 if let Some(skipped_systems) = _skip_systems {
202 debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
203 state.completed_systems |= skipped_systems;
205
206 for system_index in skipped_systems.ones() {
209 state.signal_dependents(system_index);
210 state.ready_systems.remove(system_index);
211 }
212 }
213
214 let thread_executor = world
215 .get_resource::<MainThreadExecutor>()
216 .map(|e| e.0.clone());
217 let thread_executor = thread_executor.as_deref();
218
219 let environment = &Environment::new(self, schedule, world);
220
221 ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(
222 false,
223 thread_executor,
224 |scope| {
225 let context = Context { environment, scope };
226
227 context.tick_executor();
230 },
231 );
232
233 let systems = environment.systems;
235
236 let state = self.state.get_mut().unwrap();
237 if self.apply_final_deferred {
238 let res = apply_deferred(&state.unapplied_systems, systems, world);
241 if let Err(payload) = res {
242 let panic_payload = self.panic_payload.get_mut().unwrap();
243 *panic_payload = Some(payload);
244 }
245 state.unapplied_systems.clear();
246 }
247
248 let payload = self.panic_payload.get_mut().unwrap();
250 if let Some(payload) = payload.take() {
251 std::panic::resume_unwind(payload);
252 }
253
254 debug_assert!(state.ready_systems.is_clear());
255 debug_assert!(state.running_systems.is_clear());
256 state.active_access.clear();
257 state.evaluated_sets.clear();
258 state.skipped_systems.clear();
259 state.completed_systems.clear();
260 }
261
262 fn set_apply_final_deferred(&mut self, value: bool) {
263 self.apply_final_deferred = value;
264 }
265}
266
267impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
268 fn system_completed(
269 &self,
270 system_index: usize,
271 res: Result<(), Box<dyn Any + Send>>,
272 system: &BoxedSystem,
273 ) {
274 self.environment
276 .executor
277 .system_completion
278 .push(SystemResult { system_index })
279 .unwrap_or_else(|error| unreachable!("{}", error));
280 if let Err(payload) = res {
281 eprintln!("Encountered a panic in system `{}`!", &*system.name());
282 {
284 let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();
285 *panic_payload = Some(payload);
286 }
287 }
288 self.tick_executor();
289 }
290
291 fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {
292 let guard = self.environment.executor.state.try_lock().ok()?;
293 let conditions = unsafe { &mut *self.environment.conditions.get() };
296 Some((conditions, guard))
297 }
298
299 fn tick_executor(&self) {
300 loop {
306 let Some((conditions, mut guard)) = self.try_lock() else {
307 return;
308 };
309 guard.tick(self, conditions);
310 drop(guard);
312 if self.environment.executor.system_completion.is_empty() {
313 return;
314 }
315 }
316 }
317}
318
319impl MultiThreadedExecutor {
320 pub fn new() -> Self {
324 Self {
325 state: Mutex::new(ExecutorState::new()),
326 system_completion: ConcurrentQueue::unbounded(),
327 starting_systems: FixedBitSet::new(),
328 apply_final_deferred: true,
329 panic_payload: Mutex::new(None),
330 #[cfg(feature = "trace")]
331 executor_span: info_span!("multithreaded executor"),
332 }
333 }
334}
335
336impl ExecutorState {
337 fn new() -> Self {
338 Self {
339 system_task_metadata: Vec::new(),
340 num_running_systems: 0,
341 num_dependencies_remaining: Vec::new(),
342 active_access: default(),
343 local_thread_running: false,
344 exclusive_running: false,
345 evaluated_sets: FixedBitSet::new(),
346 ready_systems: FixedBitSet::new(),
347 ready_systems_copy: FixedBitSet::new(),
348 running_systems: FixedBitSet::new(),
349 skipped_systems: FixedBitSet::new(),
350 completed_systems: FixedBitSet::new(),
351 unapplied_systems: FixedBitSet::new(),
352 }
353 }
354
355 fn tick(&mut self, context: &Context, conditions: &mut Conditions) {
356 #[cfg(feature = "trace")]
357 let _span = context.environment.executor.executor_span.enter();
358
359 for result in context.environment.executor.system_completion.try_iter() {
360 self.finish_system_and_handle_dependents(result);
361 }
362
363 self.rebuild_active_access();
364
365 unsafe {
369 self.spawn_system_tasks(context, conditions);
370 }
371 }
372
373 unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {
379 if self.exclusive_running {
380 return;
381 }
382
383 let mut ready_systems = std::mem::take(&mut self.ready_systems_copy);
385
386 let mut check_for_new_ready_systems = true;
389 while check_for_new_ready_systems {
390 check_for_new_ready_systems = false;
391
392 ready_systems.clone_from(&self.ready_systems);
393
394 for system_index in ready_systems.ones() {
395 debug_assert!(!self.running_systems.contains(system_index));
396 let system = unsafe { &mut *context.environment.systems[system_index].get() };
399
400 if !self.can_run(
401 system_index,
402 system,
403 conditions,
404 context.environment.world_cell,
405 ) {
406 continue;
411 }
412
413 self.ready_systems.remove(system_index);
414
415 if unsafe {
419 !self.should_run(
420 system_index,
421 system,
422 conditions,
423 context.environment.world_cell,
424 )
425 } {
426 self.skip_system_and_signal_dependents(system_index);
427 check_for_new_ready_systems = true;
429 continue;
430 }
431
432 self.running_systems.insert(system_index);
433 self.num_running_systems += 1;
434
435 if self.system_task_metadata[system_index].is_exclusive {
436 unsafe {
439 self.spawn_exclusive_system_task(context, system_index);
440 }
441 check_for_new_ready_systems = false;
442 break;
443 }
444
445 unsafe {
450 self.spawn_system_task(context, system_index);
451 }
452 }
453 }
454
455 self.ready_systems_copy = ready_systems;
457 }
458
459 fn can_run(
460 &mut self,
461 system_index: usize,
462 system: &mut BoxedSystem,
463 conditions: &mut Conditions,
464 world: UnsafeWorldCell,
465 ) -> bool {
466 let system_meta = &self.system_task_metadata[system_index];
467 if system_meta.is_exclusive && self.num_running_systems > 0 {
468 return false;
469 }
470
471 if !system_meta.is_send && self.local_thread_running {
472 return false;
473 }
474
475 for set_idx in conditions.sets_with_conditions_of_systems[system_index]
477 .difference(&self.evaluated_sets)
478 {
479 for condition in &mut conditions.set_conditions[set_idx] {
480 condition.update_archetype_component_access(world);
481 if !condition
482 .archetype_component_access()
483 .is_compatible(&self.active_access)
484 {
485 return false;
486 }
487 }
488 }
489
490 for condition in &mut conditions.system_conditions[system_index] {
491 condition.update_archetype_component_access(world);
492 if !condition
493 .archetype_component_access()
494 .is_compatible(&self.active_access)
495 {
496 return false;
497 }
498 }
499
500 if !self.skipped_systems.contains(system_index) {
501 system.update_archetype_component_access(world);
502 if !system
503 .archetype_component_access()
504 .is_compatible(&self.active_access)
505 {
506 return false;
507 }
508
509 self.system_task_metadata[system_index]
510 .archetype_component_access
511 .clone_from(system.archetype_component_access());
512 }
513
514 true
515 }
516
517 unsafe fn should_run(
524 &mut self,
525 system_index: usize,
526 _system: &BoxedSystem,
527 conditions: &mut Conditions,
528 world: UnsafeWorldCell,
529 ) -> bool {
530 let mut should_run = !self.skipped_systems.contains(system_index);
531 for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {
532 if self.evaluated_sets.contains(set_idx) {
533 continue;
534 }
535
536 let set_conditions_met = unsafe {
542 evaluate_and_fold_conditions(&mut conditions.set_conditions[set_idx], world)
543 };
544
545 if !set_conditions_met {
546 self.skipped_systems
547 .union_with(&conditions.systems_in_sets_with_conditions[set_idx]);
548 }
549
550 should_run &= set_conditions_met;
551 self.evaluated_sets.insert(set_idx);
552 }
553
554 let system_conditions_met = unsafe {
560 evaluate_and_fold_conditions(&mut conditions.system_conditions[system_index], world)
561 };
562
563 if !system_conditions_met {
564 self.skipped_systems.insert(system_index);
565 }
566
567 should_run &= system_conditions_met;
568
569 should_run
570 }
571
572 unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {
579 let system = unsafe { &mut *context.environment.systems[system_index].get() };
581 let context = *context;
583
584 let system_meta = &self.system_task_metadata[system_index];
585
586 let task = async move {
587 let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
588 unsafe {
593 __rust_begin_short_backtrace::run_unsafe(
594 &mut **system,
595 context.environment.world_cell,
596 );
597 };
598 }));
599 context.system_completed(system_index, res, system);
600 };
601
602 self.active_access
603 .extend(&system_meta.archetype_component_access);
604
605 if system_meta.is_send {
606 context.scope.spawn(task);
607 } else {
608 self.local_thread_running = true;
609 context.scope.spawn_on_external(task);
610 }
611 }
612
613 unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {
616 let world = unsafe { context.environment.world_cell.world_mut() };
619 let system = unsafe { &mut *context.environment.systems[system_index].get() };
621 let context = *context;
623
624 if is_apply_deferred(system) {
625 let unapplied_systems = self.unapplied_systems.clone();
627 self.unapplied_systems.clear();
628 let task = async move {
629 let res = apply_deferred(&unapplied_systems, context.environment.systems, world);
630 context.system_completed(system_index, res, system);
631 };
632
633 context.scope.spawn_on_scope(task);
634 } else {
635 let task = async move {
636 let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
637 __rust_begin_short_backtrace::run(&mut **system, world);
638 }));
639 context.system_completed(system_index, res, system);
640 };
641
642 context.scope.spawn_on_scope(task);
643 }
644
645 self.exclusive_running = true;
646 self.local_thread_running = true;
647 }
648
649 fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
650 let SystemResult { system_index, .. } = result;
651
652 if self.system_task_metadata[system_index].is_exclusive {
653 self.exclusive_running = false;
654 }
655
656 if !self.system_task_metadata[system_index].is_send {
657 self.local_thread_running = false;
658 }
659
660 debug_assert!(self.num_running_systems >= 1);
661 self.num_running_systems -= 1;
662 self.running_systems.remove(system_index);
663 self.completed_systems.insert(system_index);
664 self.unapplied_systems.insert(system_index);
665
666 self.signal_dependents(system_index);
667 }
668
669 fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
670 self.completed_systems.insert(system_index);
671 self.signal_dependents(system_index);
672 }
673
674 fn signal_dependents(&mut self, system_index: usize) {
675 for &dep_idx in &self.system_task_metadata[system_index].dependents {
676 let remaining = &mut self.num_dependencies_remaining[dep_idx];
677 debug_assert!(*remaining >= 1);
678 *remaining -= 1;
679 if *remaining == 0 && !self.completed_systems.contains(dep_idx) {
680 self.ready_systems.insert(dep_idx);
681 }
682 }
683 }
684
685 fn rebuild_active_access(&mut self) {
686 self.active_access.clear();
687 for index in self.running_systems.ones() {
688 let system_meta = &self.system_task_metadata[index];
689 self.active_access
690 .extend(&system_meta.archetype_component_access);
691 }
692 }
693}
694
695fn apply_deferred(
696 unapplied_systems: &FixedBitSet,
697 systems: &[SyncUnsafeCell<BoxedSystem>],
698 world: &mut World,
699) -> Result<(), Box<dyn Any + Send>> {
700 for system_index in unapplied_systems.ones() {
701 let system = unsafe { &mut *systems[system_index].get() };
703 let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
704 system.apply_deferred(world);
705 }));
706 if let Err(payload) = res {
707 eprintln!(
708 "Encountered a panic when applying buffers for system `{}`!",
709 &*system.name()
710 );
711 return Err(payload);
712 }
713 }
714 Ok(())
715}
716
717unsafe fn evaluate_and_fold_conditions(
723 conditions: &mut [BoxedCondition],
724 world: UnsafeWorldCell,
725) -> bool {
726 #[allow(clippy::unnecessary_fold)]
728 conditions
729 .iter_mut()
730 .map(|condition| {
731 unsafe { __rust_begin_short_backtrace::readonly_run_unsafe(&mut **condition, world) }
734 })
735 .fold(true, |acc, res| acc && res)
736}
737
738#[derive(Resource, Clone)]
740pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
741
742impl Default for MainThreadExecutor {
743 fn default() -> Self {
744 Self::new()
745 }
746}
747
748impl MainThreadExecutor {
749 pub fn new() -> Self {
751 MainThreadExecutor(TaskPool::get_thread_executor())
752 }
753}
754
755#[cfg(test)]
756mod tests {
757 use crate::{
758 self as bevy_ecs,
759 prelude::Resource,
760 schedule::{ExecutorKind, IntoSystemConfigs, Schedule},
761 system::Commands,
762 world::World,
763 };
764
765 #[derive(Resource)]
766 struct R;
767
768 #[test]
769 fn skipped_systems_notify_dependents() {
770 let mut world = World::new();
771 let mut schedule = Schedule::default();
772 schedule.set_executor_kind(ExecutorKind::MultiThreaded);
773 schedule.add_systems(
774 (
775 (|| {}).run_if(|| false),
776 |mut commands: Commands| {
778 commands.insert_resource(R);
779 },
780 )
781 .chain(),
782 );
783 schedule.run(&mut world);
784 assert!(world.get_resource::<R>().is_some());
785 }
786}