ecs.py (11388B)
1 import time 2 3 from dataclasses import dataclass 4 from pprint import pprint 5 from typing import Any, Dict, Generator, Iterable, List, Optional, Set, Tuple 6 7 from ..common import * 8 from .components import * 9 from .systems import * 10 11 12 def flatten_archetype(archetype): 13 return tuple(map(lambda c: c.cid, archetype)) 14 15 16 class EcsManager: 17 """ 18 Manages an ECS game loop. 19 """ 20 def __init__(self): 21 ## the current entity id value. Used to ensure each created entity 22 ## has a unique id. Increments whenever a new entity is added 23 self.__entity_id = 0 24 25 ## holds all currently registered entities, indexed by entity id 26 self.entities = set() 27 28 ## holds hashmaps of system action archetypes, indexed by system action 29 ## Is indexed by registered systems 30 self.systems = {} 31 32 ## holds hashmaps of component sets, indexed by the id of the entity 33 ## the components belong to. Is indexed by component archetype 34 self.archetypes = {} 35 36 ## holds hashmaps of components, indexed by the id of the entity the 37 ## component belongs to. Is indexed by the cid of the component held in 38 ## said hashmap 39 self.components = {} 40 41 ## the rate at which time flows 42 self._deltatime = 1 43 44 45 def register_system(self, system: System): 46 """ 47 Registers a system for a component type. If the system is already 48 registered then this function is a no-op. 49 """ 50 debug(f'Registering system type {system}') 51 52 if system not in self.systems: 53 self.systems[system] = {} 54 55 for action, action_archetype in system.actions().items(): 56 archetype = flatten_archetype(action_archetype) 57 self.systems[system][action] = archetype 58 59 if archetype not in self.archetypes: 60 debug(f'New archetype encountered: {archetype}') 61 self.archetypes[archetype] = {} 62 63 64 def deregister_system(self, system: System): 65 """ 66 Deregisters the given system. 67 """ 68 debug(f'Deregistering system type {system}') 69 70 if (actionset := self.systems.pop(system, None)) is None: 71 return 72 73 for action, action_archetype in actionset.items(): 74 for _, other_actionset in self.systems.items(): 75 if action_archetype in other_actionset.values(): 76 break 77 else: 78 ## if no other systems have the same archetype, we can remove 79 ## the archetype from the map 80 self.archetypes.pop(action_archetype, None) 81 debug(f'Deregistered stale archetype: {action_archetype}') 82 83 84 def create_entity(self) -> int: 85 """ 86 Creates and starts tracking a new entity. Returns the created entity, 87 which can then have components registered on itself. 88 """ 89 ## we create a new entity and assign it a unique postincremented id 90 entity = self.__entity_id 91 self.__entity_id += 1 92 93 ## we start tracking the entity 94 self.entities.add(entity) 95 96 return entity 97 98 99 def register_component(self, entity: int, component: Component) -> Component: 100 """ 101 Registers the given component for the given entity. The entity will 102 have the component added to the list of processed components. If the 103 entity already had a component of the same type registered, the old 104 component will be returned. Otherwise, the newly set component will be 105 returned. 106 """ 107 if entity not in self.entities: 108 critical(f'UNKNOWN ENTITY: {entity}') 109 raise ValueError('Attempted to register component for unknown entity') 110 111 component_type = component.cid 112 debug(f'Registering component type {component_type} for entity {entity}') 113 114 ## link the component and entity, so that we can later retrieve the 115 ## currently operated upon entity from a given component 116 component.eid = entity 117 118 ## if no component of the given type has yet been registered 119 if component_type not in self.components: 120 debug(f'New component type: {component_type}') 121 self.components[component_type] = {} ## create a hashmap for it 122 123 old_component = self.components[component_type].pop(entity, None) 124 125 ## no component of given type registered for the given entity 126 if old_component is None: 127 ## register the new component 128 self.components[component_type][entity] = component 129 130 ## we need to add the entity to any archetypes that need it 131 for archetype, bucket in self.archetypes.items(): 132 if entity in bucket: 133 ## dont add entity to bucket twice to keep debug log clear 134 continue 135 136 values = [None] * len(archetype) 137 for idx, component_type in enumerate(archetype): 138 if (component_type not in self.components or 139 entity not in self.components[component_type]): 140 ## component type not registered, or entity doesnt 141 ## have the necessary component registered component 142 break 143 values[idx] = self.components[component_type][entity] 144 else: 145 ## by adding the components to the archetype bucket, we can 146 ## iterate through only the components we need when dealing 147 ## with the archetype later 148 bucket[entity] = tuple(values) 149 debug(f'Adding entity {entity} to archetype bucket: {archetype}') 150 151 ## return the registered component 152 return component 153 154 warn(f'Entity {entity} has existing component of type {component_type}!') 155 156 ## component already exists, so replace it and return old value 157 self.components[component_type][entity] = component 158 return old_component 159 160 161 def fetch_component(self, entity: int, component_type: int) -> Optional[Component]: 162 """ 163 Returns the component of the given type, which was registered for the 164 given entity, if one exists. Otherwise, this method returns None. 165 """ 166 if entity not in self.entities: 167 critical(f'UNKNOWN ENTITY: {entity}') 168 raise ValueError('Attempted to fetch component for unknown entity') 169 170 if component_type not in self.components: 171 return None 172 173 if entity not in self.components[component_type]: 174 return None 175 176 return self.components[component_type][entity] 177 178 179 def deregister_component(self, entity: int, component_type: int) -> Optional[Component]: 180 """ 181 Deregisters the given component type for the given entity. The entity 182 will have its component removed from the list of processed components, 183 and the component will be returned. If no such component is registered, 184 this method will return None. 185 """ 186 if entity not in self.entities: 187 critical(f'UNKNOWN ENTITY: {entity}') 188 raise ValueError('Attempted to deregister component for unknown entity') 189 190 if self.fetch_component(entity, component_type) is not None: 191 debug(f'Deregistering component type {component_type} for entity {entity}') 192 ## we need to deregister any stale component sets for this entity 193 ## from any archetypes where they were registered 194 for archetype, bucket in self.archetypes.items(): 195 if component_type in archetype and entity in bucket: 196 value = bucket.pop(entity) 197 198 ## we return the old component that we just removed 199 return self.components[component_type].pop(entity) 200 201 return None 202 203 204 def destroy_entity(self, entity: int) -> List[Component]: 205 """ 206 Removes an entity from the tracked entities list. It will no longer be 207 processed. Also removes the components associated with that entity. 208 This method returns a list of all components that were registered for 209 the given entity. 210 """ 211 if entity not in self.entities: 212 critical(f'UNKNOWN ENTITY: {entity}') 213 raise ValueError('Attempted to destroy unknown entity') 214 215 registered_components = [] 216 for component_type, components in self.components.items(): 217 ## if the entity had a component of the given type registered 218 ## we add it to the list of components to return 219 if entity in components: 220 registered_components.append(components[entity]) 221 222 ## we unregister all of our registered components 223 for component in registered_components: 224 self.deregister_component(entity, component.cid) 225 226 ## actually stop tracking the entity 227 self.entities.remove(entity) 228 229 return registered_components 230 231 232 def fetch_archetype(self, archetype: Tuple[type, ...]) -> Optional[Iterable[Tuple[type, ...]]]: 233 """ 234 Fetch all component sets for the given archetype and return them. 235 """ 236 if archetype not in self.archetypes: 237 warn(f'Tried to fetch unknown archetype {archetype}!') 238 return None 239 240 return self.archetypes[archetype].values() 241 242 243 def get_deltatime(self): 244 return self._deltatime 245 246 247 def pause(self): 248 self._deltatime = 0 249 250 251 def unpause(self): 252 self._deltatime = 1 253 254 255 def setup(manager: EcsManager, screen: Screen): 256 """ 257 Performs one-time initialisation for all registered systems. 258 """ 259 for system, _ in manager.systems.items(): 260 debug(f'Performing setup for system {system}') 261 system.setup(manager, screen) 262 263 264 def cleanup(manager: EcsManager, screen: Screen): 265 """ 266 Performs one-time cleanup for all registered systems. 267 """ 268 for system, _ in manager.systems.items(): 269 debug(f'Performing cleanup for system {system}') 270 system.cleanup(manager, screen) 271 272 pending_systems = [s for s in manager.systems.keys()] 273 for system in pending_systems: 274 manager.deregister_system(system) 275 276 pending_entities = [e for e in manager.entities] 277 for entity in pending_entities: 278 manager.destroy_entity(entity) 279 280 281 def process(manager: EcsManager): 282 """ 283 Processes systems until an EcsContinuation.Stop is returned. 284 """ 285 if len(manager.systems) == 0: 286 warn(f'No systems have been registered!') 287 return 288 289 last_tick_time = time.time() 290 while True: 291 current_tick_time = time.time() 292 dt = (current_tick_time - last_tick_time) * manager.get_deltatime() 293 294 for archetype, component_sets in manager.archetypes.items(): 295 for system, actionset in manager.systems.items(): 296 for action, action_archetype in actionset.items(): 297 if archetype != action_archetype: 298 continue 299 300 components = list(component_sets.values()) 301 if action(dt, manager, components) == EcsContinuation.Stop: 302 debug(f'System action {action} stopped ECS') 303 return 304 305 last_tick_time = current_tick_time 306