ecs: make resources Send + Sync, rewrite Commands, CommandsQueue so that they are Send + Sync

This commit is contained in:
SeanOMik 2024-03-30 22:12:00 -04:00
parent 61efc358ce
commit 46cdcfdd3b
Signed by: SeanOMik
GPG Key ID: FEC9E2FC15235964
8 changed files with 193 additions and 76 deletions

View File

@ -14,6 +14,7 @@ lyra-math = { path = "../lyra-math", optional = true }
anyhow = "1.0.75"
thiserror = "1.0.50"
paste = "1.0.14"
atomic_refcell = "0.1.13"
[dev-dependencies]
rand = "0.8.5" # used for tests

View File

@ -1,4 +1,4 @@
use std::{any::Any, cell::RefMut, collections::VecDeque, ptr::{self, NonNull}};
use std::{any::Any, cell::RefMut, mem::{self, MaybeUninit}, ptr::{self, NonNull}};
use crate::{system::FnArgFetcher, Access, Bundle, Entities, Entity, World};
@ -23,24 +23,73 @@ where
}
}
type RunCommand = unsafe fn(cmd: Box<dyn Command>, world: &mut World);
type RunCommand = unsafe fn(cmd: *mut (), world: Option<&mut World>) -> usize;
#[repr(C, packed)]
struct PackedCommand<T: Command> {
run: RunCommand,
cmd: T,
}
/// Stores a queue of commands that will get executed after the system is ran.
///
/// This struct can be inserted as a resource into the world, and the commands will be
/// executed by the [`GraphExecutor`](crate::system::GraphExecutor) after the system is executed.
#[derive(Default)]
pub struct CommandQueue(VecDeque<(RunCommand, Box<dyn Command>)>);
pub struct CommandQueue {
data: Vec<MaybeUninit<u8>>,
}
impl CommandQueue {
/// Execute the commands in the queue.
///
/// If `world` is `None`, the commands will just be dropped and the memory freed.
fn execute(&mut self, mut world: Option<&mut World>) {
let range = self.data.as_mut_ptr_range();
let mut current = range.start;
let end = range.end;
while current < end {
// Retrieve the runner for the command.
// Safety: current pointer will either be the start of the buffer, or at the start of a new PackedCommand
let run_fn = unsafe { current.cast::<RunCommand>().read_unaligned() };
// Retrieves the pointer to the command which is just after RunCommand due to PackedCommand.
// Safety: PackedCommand is repr C and packed, so it will be right after the RunCommand.
current = unsafe { current.add(mem::size_of::<RunCommand>()) };
// Now run the command, providing the type erased pointer to the command.
let read_size = unsafe { run_fn(current.cast(), world.as_deref_mut()) };
// The pointer is added to so that it is just after the command that was ran.
// Safety: the RunCommand returns the size of the command
current = unsafe { current.add(read_size) };
}
// Safety: all of the commands were just read from the pointers.
unsafe { self.data.set_len(0) };
}
}
impl Drop for CommandQueue {
fn drop(&mut self) {
if !self.data.is_empty() {
println!("CommandQueue has commands but is being dropped");
}
self.execute(None);
}
}
/// Used in a system to queue up commands that will run right after this system.
///
/// This can be used to delay the mutation of the world until after the system is ran. These
/// must be used if you're mutating the world inside a [`ViewState`](crate::query::ViewState).
/// must be used if you're mutating the world inside a [`View`](crate::query::View).
///
/// ```nobuild
/// fn particle_spawner_system(
/// commands: Commands,
/// view: ViewState<(&Campfire, &Transform)>
/// view: View<(&Campfire, &Transform)>
/// ) -> anyhow::Result<()> {
/// for (campfire, pos) in view.iter() {
/// // If you do not use commands to spawn this, the next iteration
@ -66,17 +115,44 @@ impl<'a, 'b> Commands<'a, 'b> {
/// Add a command to the end of the command queue
pub fn add<C: Command>(&mut self, cmd: C) {
let cmd = Box::new(cmd);
let run_fn = |cmd_ptr: *mut (), world: Option<&mut World>| {
// Safety: the pointer is a type-erased pointer to the command. The pointer is read
// then dropped out of scope, this closure will not be ran again so no use-after-free
// will occur.
let cmd: C = unsafe { ptr::read_unaligned(cmd_ptr.cast::<C>()) };
match world {
Some(world) => cmd.run(world),
None => {} // cmd just gets dropped
}
let run_fn = |cmd: Box<dyn Command>, world: &mut World| {
let cmd = cmd.as_any_boxed()
.downcast::<C>()
.unwrap();
cmd.run(world);
// the size of the command must be returned to increment the pointer when applying
// the command queue.
mem::size_of::<C>()
};
self.queue.0.push_back((run_fn, cmd));
let data = &mut self.queue.data;
// Reserve enough bytes from the vec to store the packed command and its run fn.
let old_len = data.len();
data.reserve(mem::size_of::<PackedCommand<C>>());
// Get a pointer to the end of the packed data. Safe since we just reserved enough memory
// to store this command.
let end_ptr = unsafe { data.as_mut_ptr().add(old_len) };
unsafe {
// write the command and its runner into the buffer
end_ptr.cast::<PackedCommand<C>>()
// written unaligned to keep everything packed
.write_unaligned(PackedCommand {
run: run_fn,
cmd,
});
// we wrote to the vec's buffer without using its api, so we need manually
// set the length of the vec.
data.set_len(old_len + mem::size_of::<PackedCommand<C>>());
}
}
/// Spawn an entity into the World. See [`World::spawn`]
@ -91,14 +167,8 @@ impl<'a, 'b> Commands<'a, 'b> {
}
/// Execute all commands in the queue, in order of insertion
pub fn execute(&mut self, world: &mut World) -> anyhow::Result<()> {
while let Some((cmd_fn, cmd_ptr)) = self.queue.0.pop_front() {
unsafe {
cmd_fn(cmd_ptr, world);
}
}
Ok(())
pub fn execute(&mut self, world: &mut World) {
self.queue.execute(Some(world));
}
}
@ -125,21 +195,26 @@ impl FnArgFetcher for Commands<'_, '_> {
let mut cmds = Commands::new(&mut state, world);
// safety: Commands has a mut borrow only to entities in the world
let world = unsafe { world_ptr.as_mut() };
cmds.execute(world).unwrap()
cmds.execute(world);
}
}
/// A system for executing deferred commands that are stored in a [`World`] as a Resource.
///
/// Commands are usually added inside a system from a [`Commands`] object created just for it
/// as an fn argument. However, there may be cases that commands cannot be added that way, so
/// they can also be added as a resource and executed later in this system.
pub fn execute_deferred_commands(world: &mut World, mut commands: RefMut<Commands>) -> anyhow::Result<()> {
commands.execute(world)?;
commands.execute(world);
Ok(())
}
#[cfg(test)]
mod tests {
use std::{cell::Ref, ptr::NonNull};
use std::{cell::Ref, ptr::NonNull, sync::{atomic::{AtomicU32, Ordering}, Arc}};
use crate::{system::{GraphExecutor, IntoSystem}, tests::Vec2, Commands, DynTypeId, World};
use crate::{system::{GraphExecutor, IntoSystem}, tests::Vec2, CommandQueue, Commands, DynTypeId, World};
#[test]
fn deferred_commands() {
@ -170,4 +245,28 @@ mod tests {
let vec2: Ref<Vec2> = unsafe { col.get(3) };
assert_eq!(vec2.clone(), spawned_vec);
}
/// A test that ensures a command in a command queue will only ever run once.
#[test]
fn commands_only_one_exec() {
let mut world = World::new();
let counter = Arc::new(AtomicU32::new(0));
let mut queue = CommandQueue::default();
let mut commands = Commands::new(&mut queue, &mut world);
let counter_cl = counter.clone();
commands.add(move |_world: &mut World| {
counter_cl.fetch_add(1, Ordering::AcqRel);
});
queue.execute(Some(&mut world));
assert_eq!(1, counter.load(Ordering::Acquire));
queue.execute(Some(&mut world));
// If its not one, the command somehow was executed.
// I would be surprised it wouldn't cause some segfault but still increment the counter
assert_eq!(1, counter.load(Ordering::Acquire));
}
}

View File

@ -51,6 +51,9 @@ pub mod math;
pub use lyra_ecs_derive::*;
pub use atomic_refcell::AtomicRef;
pub use atomic_refcell::AtomicRefMut;
#[cfg(test)]
mod tests;

View File

@ -1,4 +1,6 @@
use std::{marker::PhantomData, cell::{Ref, RefMut}};
use std::marker::PhantomData;
use atomic_refcell::{AtomicRef, AtomicRefMut};
use crate::{World, resource::ResourceObject};
@ -9,7 +11,7 @@ pub struct FetchResource<'a, T> {
_phantom: PhantomData<T>,
}
impl<'a, T: 'a + 'static> Fetch<'a> for FetchResource<'a, T> {
impl<'a, T: ResourceObject + 'a> Fetch<'a> for FetchResource<'a, T> {
type Item = Res<'a, T>;
fn dangling() -> Self {
@ -79,7 +81,7 @@ impl<R: ResourceObject> AsQuery for QueryResource<R> {
}
/// A struct used for querying resources from the World.
pub struct Res<'a, T>(pub(crate) Ref<'a, T>);
pub struct Res<'a, T>(pub(crate) AtomicRef<'a, T>);
impl<'a, T: ResourceObject> std::ops::Deref for Res<'a, T> {
type Target = T;
@ -98,7 +100,7 @@ pub struct FetchResourceMut<'a, T> {
_phantom: PhantomData<T>,
}
impl<'a, T: 'a + 'static> Fetch<'a> for FetchResourceMut<'a, T> {
impl<'a, T: ResourceObject + 'a> Fetch<'a> for FetchResourceMut<'a, T> {
type Item = ResMut<'a, T>;
fn dangling() -> Self {
@ -167,7 +169,7 @@ impl<R: ResourceObject> AsQuery for QueryResourceMut<R> {
}
/// A struct used for querying resources from the World.
pub struct ResMut<'a, T>(pub(crate) RefMut<'a, T>);
pub struct ResMut<'a, T>(pub(crate) AtomicRefMut<'a, T>);
impl<'a, T: ResourceObject> std::ops::Deref for ResMut<'a, T> {
type Target = T;

View File

@ -83,8 +83,10 @@ where
/// A query that fetches the origin, and target of a relation of type `R`.
///
/// It provides it as a tuple in the following format: `(origin, relation, target)`.
/// Similar to [`RelatesTo`](super::RelatesTo), you can use [`ViewState::relate_pair`] to get a view that fetches the
/// pair, or unlike [`RelatesTo`](super::RelatesTo), you can do the common procedure of using [`World::view`].
/// Similar to [`RelatesTo`](super::RelatesTo), you can use
/// [`ViewState::relate_pair`](crate::relation::ViewState::relate_pair) to get a view that
/// fetches the pair, or unlike [`RelatesTo`](super::RelatesTo), you can do the common
/// procedure of using [`World::view`].
pub struct RelatePair<R: Relation> {
_marker: PhantomData<R>,
}

View File

@ -1,26 +1,40 @@
use std::{any::{TypeId, Any}, cell::{RefCell, Ref, RefMut}};
use std::any::{TypeId, Any};
use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut};
/// Shorthand for `Send + Sync + 'static`, so it never needs to be implemented manually.
pub trait ResourceObject: 'static {}
impl<T: 'static> ResourceObject for T {}
pub trait ResourceObject: Send + Sync + Any {
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
}
impl<T: Send + Sync + Any> ResourceObject for T {
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
/// A type erased storage for a Resource.
pub struct ResourceData {
pub(crate) data: Box<RefCell<dyn Any>>,
pub(crate) data: Box<AtomicRefCell<dyn ResourceObject>>,
type_id: TypeId,
}
impl ResourceData {
pub fn new<T: Any>(data: T) -> Self {
pub fn new<T: ResourceObject>(data: T) -> Self {
Self {
data: Box::new(RefCell::new(data)),
data: Box::new(AtomicRefCell::new(data)),
type_id: TypeId::of::<T>(),
}
}
/// Returns a boolean indicating whether or not `T`` is of the same type of the Resource
pub fn is<T: 'static>(&self) -> bool {
pub fn is<T: ResourceObject>(&self) -> bool {
self.type_id == TypeId::of::<T>()
}
@ -30,8 +44,8 @@ impl ResourceData {
///
/// * If the data is already borrowed mutably, this will panic.
/// * If the type of `T` is not the same as the resource type.
pub fn get<T: 'static>(&self) -> Ref<T> {
Ref::map(self.data.borrow(), |a| a.downcast_ref().unwrap())
pub fn get<T: ResourceObject>(&self) -> AtomicRef<T> {
AtomicRef::map(self.data.borrow(), |a| a.as_any().downcast_ref().unwrap())
}
/// Mutably borrow the data inside of the resource.
@ -40,8 +54,8 @@ impl ResourceData {
///
/// * If the data is already borrowed mutably, this will panic.
/// * If the type of `T` is not the same as the resource type.
pub fn get_mut<T: 'static>(&self) -> RefMut<T> {
RefMut::map(self.data.borrow_mut(), |a| a.downcast_mut().unwrap())
pub fn get_mut<T: ResourceObject>(&self) -> AtomicRefMut<T> {
AtomicRefMut::map(self.data.borrow_mut(), |a| a.as_any_mut().downcast_mut().unwrap())
}
/// Borrow the data inside of the resource.
@ -49,10 +63,9 @@ impl ResourceData {
/// # Panics
///
/// * If the type of `T` is not the same as the resource type.
pub fn try_get<T: 'static>(&self) -> Option<Ref<T>> {
pub fn try_get<T: ResourceObject>(&self) -> Option<AtomicRef<T>> {
self.data.try_borrow()
.map(|r| Ref::map(r, |a| a.downcast_ref().unwrap()))
.map(|r| AtomicRef::map(r, |a| a.as_any().downcast_ref().unwrap()))
.ok()
}
@ -61,9 +74,9 @@ impl ResourceData {
/// # Panics
///
/// * If the type of `T` is not the same as the resource type.
pub fn try_get_mut<T: 'static>(&self) -> Option<RefMut<T>> {
pub fn try_get_mut<T: ResourceObject>(&self) -> Option<AtomicRefMut<T>> {
self.data.try_borrow_mut()
.map(|r| RefMut::map(r, |a| a.downcast_mut().unwrap()))
.map(|r| AtomicRefMut::map(r, |a| a.as_any_mut().downcast_mut().unwrap()))
.ok()
}
}

View File

@ -58,7 +58,8 @@ impl GraphExecutor {
}
/// Executes the systems in the graph
pub fn execute(&mut self, mut world_ptr: NonNull<World>, stop_on_error: bool) -> Result<Vec<GraphExecutorError>, GraphExecutorError> {
pub fn execute(&mut self, mut world_ptr: NonNull<World>, stop_on_error: bool)
-> Result<Vec<GraphExecutorError>, GraphExecutorError> {
let mut stack = VecDeque::new();
let mut visited = HashSet::new();
@ -99,23 +100,15 @@ impl GraphExecutor {
let mut commands = Commands::new(&mut queue, world);
let world = unsafe { world_ptr.as_mut() };
if let Err(e) = commands.execute(world)
.map_err(|e| GraphExecutorError::Command(e)) {
if stop_on_error {
return Err(e);
}
possible_errors.push(e);
unimplemented!("Cannot resume topological execution from error"); // TODO: resume topological execution from error
}
commands.execute(world);
}
}
Ok(possible_errors)
}
fn topological_sort<'a>(&'a self, stack: &mut VecDeque<String>, visited: &mut HashSet<&'a str>, node: &'a GraphSystem) -> Result<(), GraphExecutorError> {
fn topological_sort<'a>(&'a self, stack: &mut VecDeque<String>,
visited: &mut HashSet<&'a str>, node: &'a GraphSystem) -> Result<(), GraphExecutorError> {
if !visited.contains(node.name.as_str()) {
visited.insert(&node.name);

View File

@ -1,4 +1,6 @@
use std::{any::TypeId, cell::{Ref, RefMut}, collections::HashMap, ptr::NonNull};
use std::{any::TypeId, collections::HashMap, ptr::NonNull};
use atomic_refcell::{AtomicRef, AtomicRefMut};
use crate::{archetype::{Archetype, ArchetypeId}, bundle::Bundle, query::{dynamic::DynamicView, AsQuery, Query, ViewState, ViewIter, ViewOne}, resource::ResourceData, ComponentInfo, DynTypeId, Entities, Entity, ResourceObject, Tick, TickTracker};
@ -271,17 +273,17 @@ impl World {
//pub fn view_one(&self, entity: EntityId) ->
pub fn add_resource<T: 'static>(&mut self, data: T) {
pub fn add_resource<T: ResourceObject>(&mut self, data: T) {
self.resources.insert(TypeId::of::<T>(), ResourceData::new(data));
}
pub fn add_resource_default<T: Default + 'static>(&mut self) {
pub fn add_resource_default<T: ResourceObject + Default>(&mut self) {
self.resources.insert(TypeId::of::<T>(), ResourceData::new(T::default()));
}
/// Get a resource from the world, or insert it into the world with the provided
/// `fn` and return it.
pub fn get_resource_or_else<T: 'static, F>(&mut self, f: F) -> RefMut<T>
pub fn get_resource_or_else<T: ResourceObject, F>(&mut self, f: F) -> AtomicRefMut<T>
where
F: Fn() -> T + 'static
{
@ -291,7 +293,7 @@ impl World {
}
/// Get a resource from the world, or insert it into the world as its default.
pub fn get_resource_or_default<T: Default + 'static>(&mut self) -> RefMut<T>
pub fn get_resource_or_default<T: ResourceObject + Default>(&mut self) -> AtomicRefMut<T>
{
self.resources.entry(TypeId::of::<T>())
.or_insert_with(|| ResourceData::new(T::default()))
@ -302,21 +304,21 @@ impl World {
///
/// Will panic if the resource is not in the world. See [`World::try_get_resource`] for
/// a function that returns an option.
pub fn get_resource<T: 'static>(&self) -> Ref<T> {
pub fn get_resource<T: ResourceObject>(&self) -> AtomicRef<T> {
self.resources.get(&TypeId::of::<T>())
.expect(&format!("World is missing resource of type '{}'", std::any::type_name::<T>()))
.get()
}
/// Returns boolean indicating if the World contains a resource of type `T`.
pub fn has_resource<T: 'static>(&self) -> bool {
pub fn has_resource<T: ResourceObject>(&self) -> bool {
self.resources.contains_key(&TypeId::of::<T>())
}
/// Attempts to get a resource from the World.
///
/// Returns `None` if the resource was not found.
pub fn try_get_resource<T: 'static>(&self) -> Option<Ref<T>> {
pub fn try_get_resource<T: ResourceObject>(&self) -> Option<AtomicRef<T>> {
self.resources.get(&TypeId::of::<T>())
.and_then(|r| r.try_get())
}
@ -325,7 +327,7 @@ impl World {
///
/// Will panic if the resource is not in the world. See [`World::try_get_resource_mut`] for
/// a function that returns an option.
pub fn get_resource_mut<T: 'static>(&self) -> RefMut<T> {
pub fn get_resource_mut<T: ResourceObject>(&self) -> AtomicRefMut<T> {
self.resources.get(&TypeId::of::<T>())
.expect(&format!("World is missing resource of type '{}'", std::any::type_name::<T>()))
.get_mut()
@ -334,7 +336,7 @@ impl World {
/// Attempts to get a mutable borrow of a resource from the World.
///
/// Returns `None` if the resource was not found.
pub fn try_get_resource_mut<T: 'static>(&self) -> Option<RefMut<T>> {
pub fn try_get_resource_mut<T: ResourceObject>(&self) -> Option<AtomicRefMut<T>> {
self.resources.get(&TypeId::of::<T>())
.and_then(|r| r.try_get_mut())
}
@ -364,6 +366,10 @@ impl World {
}
}
// TODO: Ensure that all non-send resources are only accessible on the main thread.
/* unsafe impl Send for World {}
unsafe impl Sync for World {} */
#[cfg(test)]
mod tests {
use crate::{query::TickOf, tests::{Vec2, Vec3}, Entity};
@ -440,16 +446,14 @@ mod tests {
#[test]
fn resource_multi_borrow() {
let mut world = World::new();
{
let counter = SimpleCounter(4582);
world.add_resource(counter);
}
let counter = SimpleCounter(4582);
world.add_resource(counter);
// test multiple borrows at the same time
let counter = world.get_resource::<SimpleCounter>();
assert_eq!(counter.0, 4582);
let counter2 = world.get_resource::<SimpleCounter>();
assert_eq!(counter2.0, 4582);
assert_eq!(counter.0, 4582);
assert_eq!(counter2.0, 4582);
}
@ -461,7 +465,7 @@ mod tests {
world.add_resource(counter);
}
// test multiple borrows at the same time
// test that its only possible to get a single mutable borrow
let counter = world.get_resource_mut::<SimpleCounter>();
assert_eq!(counter.0, 4582);
assert!(world.try_get_resource_mut::<SimpleCounter>().is_none());