DynamicAbstractionSystem/core/simulation_core.py
Sam 3a34759094
Some checks failed
Build Simulation and Test / Run All Tests (push) Failing after 8m17s
Add core simulation components and configuration classes
Major rewrite.
2025-11-08 19:17:40 -06:00

310 lines
11 KiB
Python

"""Pure simulation core with no UI dependencies."""
import time
import random
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
from world.world import World, Position, Rotation
from world.simulation_interface import Camera
from .event_bus import EventBus, EventType, Event
from .timing import TimingController
from config.constants import *
from config.simulation_config import SimulationConfig
@dataclass
class SimulationState:
"""Current simulation state."""
total_ticks: int = 0
actual_tps: float = 0.0
is_running: bool = False
world_buffer: int = 0
class SimulationCore:
"""Pure simulation logic with no UI dependencies."""
def __init__(self, config: SimulationConfig, event_bus: Optional[EventBus] = None):
self.config = config
self.event_bus = event_bus or EventBus()
# Initialize timing controller
self.timing = TimingController(config.default_tps, self.event_bus)
# Initialize world
self.world = self._setup_world()
# Simulation state
self.state = SimulationState()
self.is_running = False
# Camera (for spatial calculations, not rendering)
self.camera = Camera(
config.grid_width * config.cell_size,
config.grid_height * config.cell_size,
RENDER_BUFFER
)
def _setup_world(self) -> World:
"""Setup the simulation world."""
# Import locally to avoid circular imports
from world.objects import DefaultCell, FoodObject
world = World(
self.config.cell_size,
(
self.config.cell_size * self.config.grid_width,
self.config.cell_size * self.config.grid_height
)
)
# Set random seed for reproducibility
random.seed(self.config.random_seed)
# Calculate world bounds
half_width = self.config.grid_width * self.config.cell_size // 2
half_height = self.config.grid_height * self.config.cell_size // 2
# Add initial food
if self.config.food_spawning:
for _ in range(self.config.initial_food):
x = random.randint(-half_width // 2, half_width // 2)
y = random.randint(-half_height // 2, half_height // 2)
food = FoodObject(Position(x=x, y=y))
world.add_object(food)
self._notify_entity_added(food)
# Add initial cells
for _ in range(self.config.initial_cells):
cell = DefaultCell(
Position(
x=random.randint(-half_width // 2, half_width // 2),
y=random.randint(-half_height // 2, half_height // 2)
),
Rotation(angle=0)
)
# Mutate the initial behavioral model for variety
cell.behavioral_model = cell.behavioral_model.mutate(3)
world.add_object(cell)
self._notify_entity_added(cell)
return world
def start(self):
"""Start the simulation."""
self.is_running = True
self.state.is_running = True
self.timing.reset_counters()
def stop(self):
"""Stop the simulation."""
self.is_running = False
self.state.is_running = False
def pause(self):
"""Pause the simulation."""
self.timing.set_pause(True)
def resume(self):
"""Resume the simulation."""
self.timing.set_pause(False)
def step(self):
"""Execute a single simulation step."""
was_paused = self.timing.state.is_paused
self.timing.set_pause(False)
self._tick()
self.timing.set_pause(was_paused)
def update(self, delta_time: float):
"""Update simulation timing and tick if needed."""
if not self.is_running:
return
# Handle sprint mode
if self.timing.state.sprint_mode:
self._handle_sprint_mode()
return
# Handle single step
if hasattr(self, '_stepping') and self._stepping:
self._tick()
self._stepping = False
return
# Normal timing-based ticks
if self.timing.should_tick():
self._tick()
# Update timing
current_time = time.perf_counter()
if current_time - self.timing.last_tps_time >= 1.0:
self.state.actual_tps = self.timing.actual_tps
self.state.total_ticks = self.timing.total_ticks
def _tick(self):
"""Execute a single simulation tick."""
# Update world
self.world.tick_all()
# Update timing
self.timing.update_timing()
# Update state
self.state.total_ticks = self.timing.total_ticks
self.state.actual_tps = self.timing.actual_tps
self.state.world_buffer = self.world.current_buffer
# Notify subscribers
self._notify_world_tick_completed()
def _handle_sprint_mode(self):
"""Handle sprint mode execution."""
if not self.timing.state.sprint_mode:
return
start_time = time.perf_counter()
# Execute multiple ticks in sprint mode
while time.perf_counter() - start_time < 0.05: # 50ms of sprint
self._tick()
def set_tps(self, tps: float):
"""Set target TPS."""
self.timing.set_tps(tps)
def set_speed_multiplier(self, multiplier: float):
"""Set speed multiplier."""
self.timing.set_speed_multiplier(multiplier)
def toggle_pause(self):
"""Toggle pause state."""
self.timing.toggle_pause()
def toggle_sprint_mode(self):
"""Toggle sprint mode."""
self.timing.toggle_sprint_mode()
def get_entities_in_radius(self, position: Position, radius: float) -> List:
"""Get entities within radius of position."""
return self.world.query_objects_within_radius(position.x, position.y, radius)
def get_entities_in_range(self, min_pos: Position, max_pos: Position) -> List:
"""Get entities within rectangular range."""
return self.world.query_objects_in_range(min_pos.x, min_pos.y, max_pos.x, max_pos.y)
def get_closest_entity(self, position: Position, max_distance: float = None) -> Optional:
"""Get closest entity to position."""
if max_distance is not None:
# Filter by distance after getting closest
closest = self.world.query_closest_object(position.x, position.y)
if closest:
dx = closest.position.x - position.x
dy = closest.position.y - position.y
distance = (dx * dx + dy * dy) ** 0.5
if distance <= max_distance:
return closest
return None
else:
return self.world.query_closest_object(position.x, position.y)
def count_entities_by_type(self, entity_type) -> int:
"""Count entities of specific type."""
count = 0
for entity in self.world.get_objects():
if isinstance(entity, entity_type):
count += 1
return count
def get_world_state(self) -> Dict[str, Any]:
"""Get current world state for data collection."""
# Import locally to avoid circular imports
from world.objects import DefaultCell, FoodObject
return {
'tick_count': self.state.total_ticks,
'actual_tps': self.state.actual_tps,
'world_buffer': self.state.world_buffer,
'is_paused': self.timing.state.is_paused,
'sprint_mode': self.timing.state.sprint_mode,
'target_tps': self.timing.state.tps,
'speed_multiplier': self.timing.state.speed_multiplier,
'entity_counts': {
'total': len(list(self.world.get_objects())),
'cells': self.count_entities_by_type(DefaultCell),
'food': self.count_entities_by_type(FoodObject)
}
}
def get_entity_states(self) -> List[Dict[str, Any]]:
"""Get states of all entities for data collection."""
# Import locally to avoid circular imports
from world.objects import DefaultCell, FoodObject
entities = []
for entity in self.world.get_objects():
if isinstance(entity, DefaultCell):
# Get neural network info safely
nn = entity.behavioral_model.neural_network if hasattr(entity.behavioral_model, 'neural_network') else None
layer_sizes = [len(nn.layers)] if nn and hasattr(nn, 'layers') else [4, 6, 2] # Default estimate
entities.append({
'id': id(entity),
'type': 'cell',
'position': {'x': entity.position.x, 'y': entity.position.y},
'rotation': entity.rotation.angle,
'energy': entity.energy,
'age': entity.tick_count, # tick_count represents age
'generation': getattr(entity, 'generation', 0), # Default to 0 if not present
'neural_network': {
'layer_sizes': layer_sizes,
'has_neural_network': nn is not None
}
})
elif isinstance(entity, FoodObject):
entities.append({
'id': id(entity),
'type': 'food',
'position': {'x': entity.position.x, 'y': entity.position.y},
'decay': entity.decay,
'max_decay': entity.max_decay
})
return entities
def _notify_world_tick_completed(self):
"""Notify subscribers that world tick completed."""
if self.event_bus:
event = Event(
type=EventType.WORLD_TICK_COMPLETED,
data={
'tick_count': self.state.total_ticks,
'world_state': self.get_world_state()
}
)
self.event_bus.publish(event)
def _notify_entity_added(self, entity):
"""Notify subscribers of entity addition."""
if self.event_bus:
event = Event(
type=EventType.ENTITY_ADDED,
data={
'entity_id': id(entity),
'entity_type': type(entity).__name__,
'position': {'x': entity.position.x, 'y': entity.position.y}
}
)
self.event_bus.publish(event)
def _notify_entity_removed(self, entity):
"""Notify subscribers of entity removal."""
if self.event_bus:
event = Event(
type=EventType.ENTITY_REMOVED,
data={
'entity_id': id(entity),
'entity_type': type(entity).__name__
}
)
self.event_bus.publish(event)