use std::{hash::{Hash, DefaultHasher, Hasher}, collections::{HashMap, HashSet, VecDeque}, ptr::NonNull, fmt::Debug}; use lyra_ecs::{system::{GraphExecutor, GraphExecutorError, System}, World}; #[derive(thiserror::Error, Debug)] pub enum StagedExecutorError { #[error("could not find the stage that {0} depends on")] MissingStage(String, u64), #[error("[stage={0}] could not find a system's dependency named `{1}`")] MissingSystem(String, String), #[error("[stage={0}] system `{1}` returned with an error: `{2}`")] SystemError(String, String, anyhow::Error) } impl StagedExecutorError { pub fn from_graph_error(stage: String, value: GraphExecutorError) -> Self { match value { GraphExecutorError::MissingSystem(s) => Self::MissingSystem(stage, s), GraphExecutorError::SystemError(s, e) => Self::SystemError(stage, s, e), } } } /// A Stage can be used to group the execution of systems together. pub trait Stage: Hash + Debug { fn hash_stage(&self) -> u64 { let mut s = DefaultHasher::new(); Hash::hash(self, &mut s); s.finish() } } struct StageStorage { hash: u64, name: String, exec: GraphExecutor, depend: Option, } /// A system executor that executes systems in stages. /// /// Stages can depend on other stages to ensure that a group of systems run before another group. #[derive(Default)] pub struct StagedExecutor { stages: HashMap, } impl StagedExecutor { pub fn new() -> Self { Self::default() } /// Add a stage that executes after another one. /// /// Parameters: /// * `before` - The stage that will run before `after`. /// * `after` - The stage that will run after `before`. pub fn add_stage_after(&mut self, before: T, after: U) where T: Stage, U: Stage, { let name = format!("{:?}", after); let strg = StageStorage { hash: after.hash_stage(), name, exec: GraphExecutor::default(), depend: Some(before.hash_stage()), }; self.stages.insert(after.hash_stage(), strg); } /// Add a stage. /// /// This stage could run at any moment if nothing is dependent on it. pub fn add_stage(&mut self, stage: T) { let name = format!("{:?}", stage); let strg = StageStorage { hash: stage.hash_stage(), name, exec: GraphExecutor::default(), depend: None }; self.stages.insert(stage.hash_stage(), strg); } /// Add a system to an already existing stage. /// /// # Panics /// Panics if the stage was not already added to the executor pub fn add_system_to_stage(&mut self, stage: T, name: &str, system: S, depends: &[&str]) where T: Stage, S: System + 'static { let hash = stage.hash_stage(); let stage = self.stages.get_mut(&hash) .expect("Unable to find the stage to add the system into! \ Did you add the stage first?"); let exec = &mut stage.exec; exec.insert_system(name, system, depends); } /// Execute the staged systems in order. /// /// If `stop_on_error` is false but errors are encountered, those errors will be returned in a Vec. pub fn execute(&mut self, world: NonNull, stop_on_error: bool) -> Result, StagedExecutorError> { let mut stack = VecDeque::new(); let mut visited = HashSet::new(); for (_, node) in self.stages.iter() { self.topological_sort(&mut stack, &mut visited, node)?; } let mut errors = vec![]; while let Some(node) = stack.pop_front() { let stage = self.stages.get_mut(&node).unwrap(); if let Err(e) = stage.exec.execute(world, stop_on_error) { let e = StagedExecutorError::from_graph_error(stage.name.clone(), e); if stop_on_error { return Err(e); } errors.push(e); unimplemented!("Cannot resume staged execution from error"); // TODO: resume staged execution from error } } Ok(errors) } fn topological_sort<'a>(&'a self, stack: &mut VecDeque, visited: &mut HashSet, node: &'a StageStorage) -> Result<(), StagedExecutorError> { if !visited.contains(&node.hash) { visited.insert(node.hash); if let Some(depend) = node.depend { let node = self.stages.get(&depend) .ok_or_else(|| StagedExecutorError::MissingStage(node.name.clone(), depend))?; if !visited.contains(&node.hash) { self.topological_sort(stack, visited, node)?; } } stack.push_back(node.hash); } Ok(()) } }