Some checks failed
Build Simulation and Test / Run All Tests (push) Failing after 8m17s
Major rewrite.
310 lines
11 KiB
Python
310 lines
11 KiB
Python
"""Pure simulation core with no UI dependencies."""
|
|
|
|
import time
|
|
import random
|
|
from typing import List, Optional, Dict, Any
|
|
from dataclasses import dataclass
|
|
|
|
from world.world import World, Position, Rotation
|
|
from world.simulation_interface import Camera
|
|
from .event_bus import EventBus, EventType, Event
|
|
from .timing import TimingController
|
|
from config.constants import *
|
|
from config.simulation_config import SimulationConfig
|
|
|
|
|
|
@dataclass
|
|
class SimulationState:
|
|
"""Current simulation state."""
|
|
total_ticks: int = 0
|
|
actual_tps: float = 0.0
|
|
is_running: bool = False
|
|
world_buffer: int = 0
|
|
|
|
|
|
class SimulationCore:
|
|
"""Pure simulation logic with no UI dependencies."""
|
|
|
|
def __init__(self, config: SimulationConfig, event_bus: Optional[EventBus] = None):
|
|
self.config = config
|
|
self.event_bus = event_bus or EventBus()
|
|
|
|
# Initialize timing controller
|
|
self.timing = TimingController(config.default_tps, self.event_bus)
|
|
|
|
# Initialize world
|
|
self.world = self._setup_world()
|
|
|
|
# Simulation state
|
|
self.state = SimulationState()
|
|
self.is_running = False
|
|
|
|
# Camera (for spatial calculations, not rendering)
|
|
self.camera = Camera(
|
|
config.grid_width * config.cell_size,
|
|
config.grid_height * config.cell_size,
|
|
RENDER_BUFFER
|
|
)
|
|
|
|
def _setup_world(self) -> World:
|
|
"""Setup the simulation world."""
|
|
# Import locally to avoid circular imports
|
|
from world.objects import DefaultCell, FoodObject
|
|
|
|
world = World(
|
|
self.config.cell_size,
|
|
(
|
|
self.config.cell_size * self.config.grid_width,
|
|
self.config.cell_size * self.config.grid_height
|
|
)
|
|
)
|
|
|
|
# Set random seed for reproducibility
|
|
random.seed(self.config.random_seed)
|
|
|
|
# Calculate world bounds
|
|
half_width = self.config.grid_width * self.config.cell_size // 2
|
|
half_height = self.config.grid_height * self.config.cell_size // 2
|
|
|
|
# Add initial food
|
|
if self.config.food_spawning:
|
|
for _ in range(self.config.initial_food):
|
|
x = random.randint(-half_width // 2, half_width // 2)
|
|
y = random.randint(-half_height // 2, half_height // 2)
|
|
food = FoodObject(Position(x=x, y=y))
|
|
world.add_object(food)
|
|
self._notify_entity_added(food)
|
|
|
|
# Add initial cells
|
|
for _ in range(self.config.initial_cells):
|
|
cell = DefaultCell(
|
|
Position(
|
|
x=random.randint(-half_width // 2, half_width // 2),
|
|
y=random.randint(-half_height // 2, half_height // 2)
|
|
),
|
|
Rotation(angle=0)
|
|
)
|
|
# Mutate the initial behavioral model for variety
|
|
cell.behavioral_model = cell.behavioral_model.mutate(3)
|
|
world.add_object(cell)
|
|
self._notify_entity_added(cell)
|
|
|
|
return world
|
|
|
|
def start(self):
|
|
"""Start the simulation."""
|
|
self.is_running = True
|
|
self.state.is_running = True
|
|
self.timing.reset_counters()
|
|
|
|
def stop(self):
|
|
"""Stop the simulation."""
|
|
self.is_running = False
|
|
self.state.is_running = False
|
|
|
|
def pause(self):
|
|
"""Pause the simulation."""
|
|
self.timing.set_pause(True)
|
|
|
|
def resume(self):
|
|
"""Resume the simulation."""
|
|
self.timing.set_pause(False)
|
|
|
|
def step(self):
|
|
"""Execute a single simulation step."""
|
|
was_paused = self.timing.state.is_paused
|
|
self.timing.set_pause(False)
|
|
self._tick()
|
|
self.timing.set_pause(was_paused)
|
|
|
|
def update(self, delta_time: float):
|
|
"""Update simulation timing and tick if needed."""
|
|
if not self.is_running:
|
|
return
|
|
|
|
# Handle sprint mode
|
|
if self.timing.state.sprint_mode:
|
|
self._handle_sprint_mode()
|
|
return
|
|
|
|
# Handle single step
|
|
if hasattr(self, '_stepping') and self._stepping:
|
|
self._tick()
|
|
self._stepping = False
|
|
return
|
|
|
|
# Normal timing-based ticks
|
|
if self.timing.should_tick():
|
|
self._tick()
|
|
|
|
# Update timing
|
|
current_time = time.perf_counter()
|
|
if current_time - self.timing.last_tps_time >= 1.0:
|
|
self.state.actual_tps = self.timing.actual_tps
|
|
self.state.total_ticks = self.timing.total_ticks
|
|
|
|
def _tick(self):
|
|
"""Execute a single simulation tick."""
|
|
# Update world
|
|
self.world.tick_all()
|
|
|
|
# Update timing
|
|
self.timing.update_timing()
|
|
|
|
# Update state
|
|
self.state.total_ticks = self.timing.total_ticks
|
|
self.state.actual_tps = self.timing.actual_tps
|
|
self.state.world_buffer = self.world.current_buffer
|
|
|
|
# Notify subscribers
|
|
self._notify_world_tick_completed()
|
|
|
|
def _handle_sprint_mode(self):
|
|
"""Handle sprint mode execution."""
|
|
if not self.timing.state.sprint_mode:
|
|
return
|
|
|
|
start_time = time.perf_counter()
|
|
|
|
# Execute multiple ticks in sprint mode
|
|
while time.perf_counter() - start_time < 0.05: # 50ms of sprint
|
|
self._tick()
|
|
|
|
def set_tps(self, tps: float):
|
|
"""Set target TPS."""
|
|
self.timing.set_tps(tps)
|
|
|
|
def set_speed_multiplier(self, multiplier: float):
|
|
"""Set speed multiplier."""
|
|
self.timing.set_speed_multiplier(multiplier)
|
|
|
|
def toggle_pause(self):
|
|
"""Toggle pause state."""
|
|
self.timing.toggle_pause()
|
|
|
|
def toggle_sprint_mode(self):
|
|
"""Toggle sprint mode."""
|
|
self.timing.toggle_sprint_mode()
|
|
|
|
def get_entities_in_radius(self, position: Position, radius: float) -> List:
|
|
"""Get entities within radius of position."""
|
|
return self.world.query_objects_within_radius(position.x, position.y, radius)
|
|
|
|
def get_entities_in_range(self, min_pos: Position, max_pos: Position) -> List:
|
|
"""Get entities within rectangular range."""
|
|
return self.world.query_objects_in_range(min_pos.x, min_pos.y, max_pos.x, max_pos.y)
|
|
|
|
def get_closest_entity(self, position: Position, max_distance: float = None) -> Optional:
|
|
"""Get closest entity to position."""
|
|
if max_distance is not None:
|
|
# Filter by distance after getting closest
|
|
closest = self.world.query_closest_object(position.x, position.y)
|
|
if closest:
|
|
dx = closest.position.x - position.x
|
|
dy = closest.position.y - position.y
|
|
distance = (dx * dx + dy * dy) ** 0.5
|
|
if distance <= max_distance:
|
|
return closest
|
|
return None
|
|
else:
|
|
return self.world.query_closest_object(position.x, position.y)
|
|
|
|
def count_entities_by_type(self, entity_type) -> int:
|
|
"""Count entities of specific type."""
|
|
count = 0
|
|
for entity in self.world.get_objects():
|
|
if isinstance(entity, entity_type):
|
|
count += 1
|
|
return count
|
|
|
|
def get_world_state(self) -> Dict[str, Any]:
|
|
"""Get current world state for data collection."""
|
|
# Import locally to avoid circular imports
|
|
from world.objects import DefaultCell, FoodObject
|
|
|
|
return {
|
|
'tick_count': self.state.total_ticks,
|
|
'actual_tps': self.state.actual_tps,
|
|
'world_buffer': self.state.world_buffer,
|
|
'is_paused': self.timing.state.is_paused,
|
|
'sprint_mode': self.timing.state.sprint_mode,
|
|
'target_tps': self.timing.state.tps,
|
|
'speed_multiplier': self.timing.state.speed_multiplier,
|
|
'entity_counts': {
|
|
'total': len(list(self.world.get_objects())),
|
|
'cells': self.count_entities_by_type(DefaultCell),
|
|
'food': self.count_entities_by_type(FoodObject)
|
|
}
|
|
}
|
|
|
|
def get_entity_states(self) -> List[Dict[str, Any]]:
|
|
"""Get states of all entities for data collection."""
|
|
# Import locally to avoid circular imports
|
|
from world.objects import DefaultCell, FoodObject
|
|
|
|
entities = []
|
|
for entity in self.world.get_objects():
|
|
if isinstance(entity, DefaultCell):
|
|
# Get neural network info safely
|
|
nn = entity.behavioral_model.neural_network if hasattr(entity.behavioral_model, 'neural_network') else None
|
|
layer_sizes = [len(nn.layers)] if nn and hasattr(nn, 'layers') else [4, 6, 2] # Default estimate
|
|
|
|
entities.append({
|
|
'id': id(entity),
|
|
'type': 'cell',
|
|
'position': {'x': entity.position.x, 'y': entity.position.y},
|
|
'rotation': entity.rotation.angle,
|
|
'energy': entity.energy,
|
|
'age': entity.tick_count, # tick_count represents age
|
|
'generation': getattr(entity, 'generation', 0), # Default to 0 if not present
|
|
'neural_network': {
|
|
'layer_sizes': layer_sizes,
|
|
'has_neural_network': nn is not None
|
|
}
|
|
})
|
|
elif isinstance(entity, FoodObject):
|
|
entities.append({
|
|
'id': id(entity),
|
|
'type': 'food',
|
|
'position': {'x': entity.position.x, 'y': entity.position.y},
|
|
'decay': entity.decay,
|
|
'max_decay': entity.max_decay
|
|
})
|
|
return entities
|
|
|
|
def _notify_world_tick_completed(self):
|
|
"""Notify subscribers that world tick completed."""
|
|
if self.event_bus:
|
|
event = Event(
|
|
type=EventType.WORLD_TICK_COMPLETED,
|
|
data={
|
|
'tick_count': self.state.total_ticks,
|
|
'world_state': self.get_world_state()
|
|
}
|
|
)
|
|
self.event_bus.publish(event)
|
|
|
|
def _notify_entity_added(self, entity):
|
|
"""Notify subscribers of entity addition."""
|
|
if self.event_bus:
|
|
event = Event(
|
|
type=EventType.ENTITY_ADDED,
|
|
data={
|
|
'entity_id': id(entity),
|
|
'entity_type': type(entity).__name__,
|
|
'position': {'x': entity.position.x, 'y': entity.position.y}
|
|
}
|
|
)
|
|
self.event_bus.publish(event)
|
|
|
|
def _notify_entity_removed(self, entity):
|
|
"""Notify subscribers of entity removal."""
|
|
if self.event_bus:
|
|
event = Event(
|
|
type=EventType.ENTITY_REMOVED,
|
|
data={
|
|
'entity_id': id(entity),
|
|
'entity_type': type(entity).__name__
|
|
}
|
|
)
|
|
self.event_bus.publish(event) |