systems.py (24749B)
1 import abc 2 import random 3 import time 4 5 from math import cos, pi, sin 6 #from profilehooks import profile 7 from typing import Any, Callable, Dict, Iterator, Tuple, TypeVar 8 9 from ..common import * 10 from .components import * 11 from ..platform import * 12 13 14 ## takes dt (seconds), manager (EcsManager instance), components 15 SystemAction = Callable[[float, Any, Iterator[Tuple[Component, ...]]], EcsContinuation] 16 17 18 class System(metaclass=abc.ABCMeta): 19 """ 20 Defines the interface for a 'System', handling components of a single type. 21 """ 22 @classmethod 23 def __subclasshook__(cls, subclass): 24 return ( 25 hasattr(subclass, 'actions') and callable(subclass.actions) and 26 hasattr(subclass, 'setup') and callable(subclass.setup) and 27 hasattr(subclass, 'cleanup') and callable(subclass.cleanup) 28 ) 29 30 31 @abc.abstractmethod 32 def actions(self) -> Dict[SystemAction, Tuple[type, ...]]: 33 """ 34 Returns a tuple containing the component types this system operates on. 35 In other words, returns the component archetype of this system. 36 """ 37 raise NotImplementedError 38 39 40 @abc.abstractmethod 41 def setup(self, manager: Any, screen: Screen): 42 """ 43 Perform one-time initialisation of the system. 44 """ 45 raise NotImplementedError 46 47 48 @abc.abstractmethod 49 def cleanup(self, manager: Any, screen: Screen): 50 """ 51 Perform one-time cleanup of the system before shutdown. 52 """ 53 raise NotImplementedError 54 55 56 class SystemBase: 57 """ 58 Implements functionality common to each system. 59 """ 60 def __hash__(self) -> int: 61 return self.sid 62 63 64 @AutoId.system 65 class Render2DSystem(SystemBase): 66 """ 67 System to handle rendering code. 68 """ 69 def actions(self): 70 return { 71 self.process_active: (Transform2D, ScreenElement), 72 self.process_stale: (StaleTag,), 73 } 74 75 76 def setup(self, manager, screen): 77 self.screen = screen 78 79 80 def process_active(self, dt, manager, components) -> EcsContinuation: 81 for transform, element in components: 82 coords = [None] * len(element.vertices) 83 84 ## translate coords into screen space 85 for i in range(len(element.vertices)): 86 if i % 2 == 0: ## even-indexed elements are x coords 87 coords[i] = element.vertices[i] + transform.px 88 else: ## odd-indexed elements are y coords 89 coords[i] = element.vertices[i] + transform.py 90 91 self.screen.set_coords(element.handle, coords) 92 93 self.screen.tick(dt, manager) 94 95 return EcsContinuation.Continue 96 97 98 def process_stale(self, dt, manager, components) -> EcsContinuation: 99 for tag, in components: 100 entity = tag.eid 101 if (element := manager.fetch_component(entity, ScreenElement.cid)) is not None: 102 self.screen.remove(element.handle) 103 manager.destroy_entity(entity) 104 105 return EcsContinuation.Continue 106 107 108 def cleanup(self, manager, screen): 109 pass 110 111 112 @AutoId.system 113 class Collider2DSystem(SystemBase): 114 """ 115 System to flag colliding objects (right now we only process bullet collisions). 116 """ 117 def actions(self): 118 return { 119 self.process: (Transform2D, Collider2D), 120 } 121 122 123 def setup(self, manager, screen): 124 pass 125 126 127 def do_intersect(self, transform_a, collider_a, transform_b, collider_b): 128 if transform_a.px - collider_a.sx / 2 < transform_b.px + collider_b.sx / 2 and \ 129 transform_a.px + collider_a.sx / 2 > transform_b.px - collider_b.sx / 2 and \ 130 transform_a.py - collider_a.sy / 2 < transform_b.py + collider_b.sy / 2 and \ 131 transform_a.py + collider_a.sy / 2 > transform_b.py - collider_b.sy / 2: 132 return True 133 134 return False 135 136 ## TODO: implement spatial hashing 137 #@profile 138 def process(self, dt, manager, components) -> EcsContinuation: 139 for idx, (transform_a, collider_a) in enumerate(components): 140 entity_a = transform_a.eid 141 142 for transform_b, collider_b in components[idx:]: 143 entity_b = transform_b.eid 144 145 if entity_a == entity_b: 146 continue 147 148 ## only register collisions between a bullet and non-bullet 149 bullet_a = manager.fetch_component(entity_a, BulletTag.cid) 150 bullet_b = manager.fetch_component(entity_b, BulletTag.cid) 151 if (bullet_a is not None and bullet_b is not None) or \ 152 (bullet_a is None and bullet_b is None): 153 continue 154 155 bullet = entity_a if bullet_a is not None else entity_b 156 other = entity_b if bullet_a is not None else entity_a 157 158 if self.do_intersect(transform_a, collider_a, transform_b, collider_b): 159 ## TODO: consider splitting collision resolution into separate system? 160 #manager.register_component(entity_a, Collision(entity_b)) 161 #manager.register_component(entity_b, Collision(entity_a)) 162 163 if (lives := manager.fetch_component(other, Lives.cid)) is not None: 164 bullet_player_tag = manager.fetch_component(bullet, PlayerTag.cid) 165 bullet_enemy_tag = manager.fetch_component(bullet, EnemyTag.cid) 166 other_player_tag = manager.fetch_component(other, PlayerTag.cid) 167 other_enemy_tag = manager.fetch_component(other, EnemyTag.cid) 168 169 if bullet_player_tag != other_player_tag or \ 170 bullet_enemy_tag != other_enemy_tag: 171 lives.count -= 1 172 manager.register_component(bullet, StaleTag()) 173 174 return EcsContinuation.Continue 175 176 177 def cleanup(self, manager, screen): 178 pass 179 180 181 @AutoId.system 182 class EdgeHarmSystem(SystemBase): 183 """ 184 System to manage edge harm components. 185 """ 186 def actions(self): 187 return { 188 self.process: (Transform2D, Collider2D, EdgeHarm), 189 } 190 191 192 def setup(self, manager, screen): 193 pass 194 195 196 def process(self, dt, manager, components) -> EcsContinuation: 197 for transform, collider, edge_harm in components: 198 entity = transform.eid 199 dy = collider.sy // 2 200 201 if edge_harm.py - dy < transform.py: 202 target_lives = manager.fetch_component(edge_harm.target, Lives.cid) 203 if target_lives is not None: 204 target_lives.count -= 1 205 206 manager.register_component(entity, StaleTag()) 207 208 return EcsContinuation.Continue 209 210 211 def cleanup(self, manager, screen): 212 pass 213 214 215 @AutoId.system 216 class Physics2DSystem(SystemBase): 217 """ 218 System to handle 2D physics. 219 """ 220 def actions(self): 221 return { 222 self.process: (Transform2D, Velocity2D), 223 } 224 225 226 def setup(self, manager, screen): 227 pass 228 229 230 def process(self, dt, manager, components) -> EcsContinuation: 231 for transform, velocity in components: 232 transform.px += velocity.vx * dt 233 transform.py += velocity.vy * dt 234 235 return EcsContinuation.Continue 236 237 238 def cleanup(self, manager, screen): 239 pass 240 241 242 @AutoId.system 243 class UserInputSystem(SystemBase): 244 """ 245 System to handle user input (keypresses). 246 """ 247 def actions(self): 248 return { 249 self.process: (Transform2D, Collider2D, Velocity2D, UserInput), 250 } 251 252 253 def setup(self, manager, screen): 254 ## the bits which should be set when the control event is raised 255 self.input_masks = { 256 'left': 0b00000001, 257 'right': 0b00000010, 258 'up': 0b00000100, 259 'down': 0b00001000, 260 'fire_primary': 0b00010000, 261 'fire_secondary': 0b00100000, 262 'menu': 0b01000000, 263 'boss': 0b10000000, 264 } 265 266 ## the bits which should be unset when the control event is raised 267 self.input_reset_masks = { 268 'left': 0b11111101, 269 'right': 0b11111110, 270 'up': 0b11110111, 271 'down': 0b11111011, 272 'fire_primary': 0b11111111, 273 'fire_secondary': 0b11111111, 274 'menu': 0b11111111, 275 'boss': 0b11111111, 276 } 277 278 ## maps a key to a control event 279 self.controls = { 280 'escape': 'menu', 281 'space': 'boss', 282 'a': 'left', 283 'd': 'right', 284 'w': 'up', 285 's': 'down', 286 'j': 'fire_primary', 287 'k': 'fire_secondary', 288 } 289 290 ## Gradius is that you?!? 291 self.completed_easter_egg = False 292 self.easter_egg_idx = 0 293 self.easter_egg = [ 294 'up', 295 'up', 296 'down', 297 'down', 298 'left', 299 'right', 300 'left', 301 'right', 302 'b', 303 'a'] 304 305 ## the current input value 306 self.input_bitmask = 0 307 308 ## bitmask used to get the inverse input_mask 309 self.inverse_bitmask = 0b11111111 310 311 self.primary_cooldown = 0.15 312 self.can_shoot_primary = True 313 314 self.secondary_cooldown = 5 315 self.can_shoot_secondary = True 316 317 self.boss_key_cooldown = 0.2 318 self.can_toggle_boss_key = True 319 320 self.menu_key_cooldown = 0.2 321 self.can_toggle_menu_key = True 322 323 self.currently_paused = False 324 self.pause_initiator = '' 325 326 self.screen = screen 327 self.screen.set_event_handler(f'<KeyPress>', 328 lambda e: self.handle_key_pressed(e)) 329 self.screen.set_event_handler(f'<KeyRelease>', 330 lambda e: self.handle_key_released(e)) 331 self.screen.set_proto_handler('WM_DELETE_WINDOW', 332 lambda: self.handle_window_closed()) 333 self.screen.set_quit_handler(lambda: self.handle_window_closed()) 334 335 self.continuation = EcsContinuation.Continue 336 337 338 def reset_boss_key(self): 339 self.can_toggle_boss_key = True 340 341 342 def reset_menu_key(self): 343 self.can_toggle_menu_key = True 344 345 346 def reset_primary(self): 347 self.can_shoot_primary = True 348 349 350 def reset_secondary(self): 351 self.can_shoot_secondary = True 352 353 354 def check_mask(self, event): 355 return (self.input_bitmask & self.input_masks[event]) == self.input_masks[event] 356 357 358 def process(self, dt, manager, components) -> EcsContinuation: 359 e = self.input_bitmask 360 361 for transform, collider, velocity, user_input in components: 362 entity = transform.eid 363 364 if self.completed_easter_egg: 365 print('GAMER MOMENT') 366 lives = manager.fetch_component(entity, Lives.cid) 367 if lives is not None: 368 lives.count = 30 369 self.completed_easter_egg = False 370 371 if self.check_mask('menu'): 372 if self.can_toggle_menu_key: 373 if not self.currently_paused: 374 manager.pause() 375 self.currently_paused = True 376 self.pause_initiator = 'menu' 377 elif self.currently_paused and self.pause_initiator == 'menu': 378 manager.unpause() 379 self.currently_paused = False 380 381 ## allow 1 menu at one time 382 if self.pause_initiator == 'menu': 383 self.screen.toggle_menu() 384 385 self.can_toggle_menu_key = False 386 self.screen.do_after(self.menu_key_cooldown, 387 lambda: self.reset_menu_key()) 388 389 if self.check_mask('boss'): 390 if self.can_toggle_boss_key: 391 if not self.currently_paused: 392 manager.pause() 393 self.currently_paused = True 394 self.pause_initiator = 'boss' 395 elif self.currently_paused and self.pause_initiator == 'boss': 396 manager.unpause() 397 self.currently_paused = False 398 399 ## allow 1 menu at one time 400 if self.pause_initiator == 'boss': 401 self.screen.toggle_boss_image() 402 403 self.can_toggle_boss_key = False 404 self.screen.do_after(self.boss_key_cooldown, 405 lambda: self.reset_boss_key()) 406 407 vx, vy = 0, 0 408 if self.check_mask('left'): 409 vx = -1 410 if self.check_mask('right'): 411 vx = 1 412 if self.check_mask('up'): 413 vy = -1 414 if self.check_mask('down'): 415 vy = 1 416 417 ## we normalise the entities velocity vector to ensure that the 418 ## entity cannot use diagonal movement to break the speed barrier 419 magnitude = (vx * vx + vy * vy) ** 0.5 420 if magnitude: 421 vx = (vx / magnitude) * user_input.speed 422 vy = (vy / magnitude) * user_input.speed 423 else: 424 vx *= user_input.speed 425 vy *= user_input.speed 426 427 ## clamp the user to the playing field 428 dx, dy = collider.sx // 2, collider.sy // 2 429 if transform.px < dx: 430 transform.px += 1 431 vx = 0 432 elif WIDTH - dx < transform.px: 433 transform.px -= 1 434 vx = 0 435 if transform.py < dy: 436 transform.py += 1 437 vy = 0 438 elif HEIGHT - dy < transform.py: 439 transform.py -= 1 440 vy = 0 441 442 ## actually assign the velocity 443 velocity.vx = vx 444 velocity.vy = vy 445 446 if self.check_mask('fire_primary'): 447 if self.can_shoot_primary: 448 manager.register_component(entity, FireLinearBulletEmitter()) 449 450 self.can_shoot_primary = False 451 self.screen.do_after(self.primary_cooldown, 452 lambda: self.reset_primary()) 453 454 if self.check_mask('fire_secondary'): 455 if self.can_shoot_secondary: 456 manager.register_component(entity, FireRadialBulletEmitter()) 457 458 self.can_shoot_secondary = False 459 self.screen.do_after(self.secondary_cooldown, 460 lambda: self.reset_secondary()) 461 462 return self.continuation 463 464 465 def move_next_code_elem(self, key): 466 if self.easter_egg_idx == -1: ## disallow multiple code inputs 467 return 468 469 if key != self.easter_egg[self.easter_egg_idx]: 470 self.easter_egg_idx = 0 471 return 472 473 self.easter_egg_idx += 1 474 475 if self.easter_egg_idx == len(self.easter_egg): 476 self.easter_egg_idx = -1 477 self.completed_easter_egg = True 478 479 480 def handle_key_pressed(self, event): 481 self.move_next_code_elem(event.keysym.lower()) 482 483 if event.keysym in self.controls: 484 binding = self.controls[event.keysym] 485 elif event.keysym.lower() in self.controls: 486 binding = self.controls[event.keysym.lower()] 487 else: 488 return 489 490 if binding in self.input_masks and binding in self.input_reset_masks: 491 self.input_bitmask &= self.input_reset_masks[binding] 492 self.input_bitmask |= self.input_masks[binding] 493 494 495 def handle_key_released(self, event): 496 if event.keysym in self.controls: 497 binding = self.controls[event.keysym] 498 elif event.keysym.lower() in self.controls: 499 binding = self.controls[event.keysym.lower()] 500 else: 501 return 502 503 if binding in self.input_masks and binding in self.input_reset_masks: 504 self.input_bitmask &= self.inverse_bitmask ^ self.input_masks[binding] 505 506 507 def handle_window_closed(self): 508 self.continuation = EcsContinuation.Stop 509 510 511 def cleanup(self, manager, screen): 512 pass 513 514 515 @AutoId.system 516 class LifeSystem(SystemBase): 517 """ 518 Clears dead entities. 519 """ 520 def actions(self): 521 return { 522 self.process: (Lives,), 523 } 524 525 526 def setup(self, manager, screen): 527 self.screen = screen 528 self.continuation = EcsContinuation.Continue 529 530 531 def process(self, dt, manager, components) -> EcsContinuation: 532 player = -1 533 score_delta = 0 534 535 for lives, in components: 536 entity = lives.eid 537 538 player_tag = manager.fetch_component(entity, PlayerTag.cid) 539 540 if player_tag is not None: 541 player = entity 542 543 if lives.count <= 0: 544 if player_tag is not None: 545 manager.pause() 546 self.screen.set_tracked_entity(-1) 547 548 def callback(): 549 self.continuation = EcsContinuation.Stop 550 551 self.screen.toggle_gameover(callback) 552 553 if manager.fetch_component(entity, EnemyTag.cid): 554 score_delta += 1 555 556 manager.register_component(entity, StaleTag()) 557 558 if player != -1: 559 player_score = manager.fetch_component(player, Score.cid) 560 player_score.count += score_delta 561 562 return self.continuation 563 564 565 def cleanup(self, manager, screen): 566 pass 567 568 569 @AutoId.system 570 class LifespanSystem(SystemBase): 571 """ 572 System to kill components that exceed their lifespan. 573 """ 574 def actions(self): 575 return { 576 self.process: (Lifespan,), 577 } 578 579 580 def setup(self, manager, screen): 581 pass 582 583 584 def process(self, dt, manager, components) -> EcsContinuation: 585 for lifespan, in components: 586 entity = lifespan.eid 587 588 lifespan.ttl -= dt 589 590 if lifespan.ttl < 0: 591 manager.register_component(entity, StaleTag()) 592 593 return EcsContinuation.Continue 594 595 596 def cleanup(self, manager, screen): 597 pass 598 599 600 @AutoId.system 601 class BulletEmitterSystem(SystemBase): 602 """ 603 Controls entities with a bullet emitter. 604 """ 605 def actions(self): 606 return { 607 self.process_linear: (Transform2D, Collider2D, LinearBulletEmitter), 608 self.process_radial: (Transform2D, Collider2D, RadialBulletEmitter), 609 } 610 611 612 def setup(self, manager, screen): 613 self.screen = screen 614 615 616 def create_bullet(self, manager, bullet_transform, bullet_velocity, bullet_data): 617 bullet = manager.create_entity() 618 619 bullet_collider = Collider2D( 620 bullet_data.bullet_size, bullet_data.bullet_size) 621 622 bullet_screen_element = self.screen.draw_poly( 623 bullet_data.bullet_vertices, 624 fill=bullet_data.bullet_colours[bullet_data.bullet_colour_idx]) 625 626 bullet_data.bullet_colour_idx += 1 627 bullet_data.bullet_colour_idx %= len(bullet_data.bullet_colours) 628 629 bullet_sprite = ScreenElement( 630 bullet_screen_element, bullet_data.bullet_vertices) 631 632 ## ensure that bullets can travel the diagonal of the playfield at least 633 lifespan = (((HEIGHT ** 2) + (WIDTH ** 2)) ** 0.5) / abs(bullet_data.bullet_speed) 634 bullet_lifespan = Lifespan(lifespan) 635 636 manager.register_component(bullet, bullet_transform) 637 manager.register_component(bullet, bullet_collider) 638 manager.register_component(bullet, bullet_velocity) 639 manager.register_component(bullet, bullet_lifespan) 640 manager.register_component(bullet, bullet_sprite) 641 manager.register_component(bullet, BulletTag()) 642 643 return bullet 644 645 646 def process_linear(self, dt, manager, components) -> EcsContinuation: 647 if dt == 0: ## dont shoot when paused 648 return EcsContinuation.Continue 649 650 for transform, collider, emitter in components: 651 entity = transform.eid 652 653 if (tag := manager.fetch_component(entity, FireLinearBulletEmitter.cid)) is None: 654 continue 655 656 manager.deregister_component(entity, tag.cid) 657 658 bullet_transform = Transform2D(transform.px, transform.py, 0) 659 660 vx, vy = 0, emitter.data.bullet_speed 661 662 bullet_velocity = Velocity2D(vx, vy) 663 664 bullet = self.create_bullet( 665 manager, bullet_transform, bullet_velocity, emitter.data) 666 667 if manager.fetch_component(entity, PlayerTag.cid) is not None: 668 manager.register_component(bullet, PlayerTag()) 669 elif manager.fetch_component(entity, EnemyTag.cid) is not None: 670 manager.register_component(bullet, EnemyTag()) 671 672 return EcsContinuation.Continue 673 674 675 def process_radial(self, dt, manager, components) -> EcsContinuation: 676 if dt == 0: ## dont shoot when paused 677 return EcsContinuation.Continue 678 679 for transform, collider, emitter in components: 680 entity = transform.eid 681 682 if (tag := manager.fetch_component(entity, FireRadialBulletEmitter.cid)) is None: 683 continue 684 685 manager.deregister_component(entity, tag.cid) 686 687 sweep_increment = 2 * pi / emitter.bullet_count 688 689 bullet_data = emitter.data 690 691 for n in range(emitter.bullet_count): 692 bullet_transform = Transform2D(transform.px, transform.py, 0) 693 694 theta = emitter.bullet_arc_offset + (n * sweep_increment) 695 696 ## Clockwise Rotation matrix: 697 ## [ cos0 sin0 ] [ x ] = [ x*cos0 + y*sin0 ] 698 ## [ -sin0 cos0 ] [ y ] [ -x*sin0 + y*cos0 ] 699 ## Since we only have a 1-dimensional 'speed', we assign it to 700 ## 'y' (completely arbitrarily) and then calculate the [vx vy] 701 ## vector for the rotated projectile (we assume x = 0) 702 vx, vy = sin(theta), cos(theta) 703 704 speed = emitter.data.bullet_speed 705 bullet_velocity = Velocity2D(vx * speed, vy * speed) 706 707 bullet = self.create_bullet( 708 manager, bullet_transform, bullet_velocity, bullet_data) 709 710 if manager.fetch_component(entity, PlayerTag.cid) is not None: 711 manager.register_component(bullet, PlayerTag()) 712 elif manager.fetch_component(entity, EnemyTag.cid) is not None: 713 manager.register_component(bullet, EnemyTag()) 714 715 return EcsContinuation.Continue 716 717 718 def cleanup(self, manager, screen): 719 pass 720 721 722 @AutoId.system 723 class SpawnerSystem(SystemBase): 724 """ 725 Manages all of the spawners in the game. 726 """ 727 def actions(self): 728 return { 729 self.process: (Spawner,), 730 } 731 732 733 def setup(self, manager, screen): 734 self.screen = screen 735 self.cooldowns = {} 736 737 738 def process(self, dt, manager, components) -> EcsContinuation: 739 for spawner, in components: 740 entity = spawner.eid 741 cooldown_remaining = self.cooldowns.get(entity, 0) 742 743 cooldown_remaining -= dt 744 if cooldown_remaining <= 0: 745 cooldown_remaining, (spawn_px, spawn_py) = next(spawner.spawn_generator) 746 new_entity = spawner.instantiate((spawn_px, spawn_py), manager, self.screen) 747 748 self.cooldowns[entity] = cooldown_remaining 749 750 return EcsContinuation.Continue 751 752 753 def cleanup(self, manager, screen): 754 pass 755 756 757 @AutoId.system 758 class EnemyEmitterSystem(SystemBase): 759 """ 760 Manages enemies with bullet emitters. 761 """ 762 def actions(self): 763 return { 764 self.process: (EnemyEmitterCooldown,), 765 } 766 767 768 def setup(self, manager, screen): 769 self.cooldowns = {} 770 771 772 def process(self, dt, manager, components) -> EcsContinuation: 773 for emitter_cooldown, in components: 774 entity = emitter_cooldown.eid 775 776 remaining_cooldown = self.cooldowns.get(entity, 0) 777 778 remaining_cooldown -= dt 779 if remaining_cooldown <= 0: 780 manager.register_component(entity, FireLinearBulletEmitter()) 781 manager.register_component(entity, FireRadialBulletEmitter()) 782 783 remaining_cooldown = random.randint( 784 emitter_cooldown.min_cooldown, emitter_cooldown.max_cooldown) 785 786 self.cooldowns[entity] = remaining_cooldown 787 788 return EcsContinuation.Continue 789 790 791 def cleanup(self, manager, screen): 792 pass 793