Some checks failed
Build Simulation and Test / Run All Tests (push) Failing after 1m46s
390 lines
12 KiB
Python
390 lines
12 KiB
Python
"""Tests for core simulation components."""
|
|
|
|
import pytest
|
|
import time
|
|
from unittest.mock import Mock
|
|
|
|
from core.simulation_core import SimulationCore, SimulationState
|
|
from core.timing import TimingController, TimingState
|
|
from core.event_bus import EventBus, EventType, Event
|
|
from config.simulation_config import SimulationConfig
|
|
|
|
|
|
class TestTimingController:
|
|
"""Test timing controller functionality."""
|
|
|
|
def test_initialization(self):
|
|
"""Test timing controller initialization."""
|
|
event_bus = Mock()
|
|
timing = TimingController(default_tps=60.0, event_bus=event_bus)
|
|
|
|
assert timing.state.tps == 60.0
|
|
assert timing.state.is_paused == False
|
|
assert timing.state.sprint_mode == False
|
|
assert timing.state.speed_multiplier == 1.0
|
|
assert timing.default_tps == 60.0
|
|
assert timing.total_ticks == 0
|
|
|
|
def test_set_tps(self):
|
|
"""Test TPS setting."""
|
|
timing = TimingController(default_tps=60.0)
|
|
|
|
timing.set_tps(120.0)
|
|
assert timing.state.tps == 120.0
|
|
assert timing.state.speed_multiplier == 2.0
|
|
|
|
# Test bounds
|
|
timing.set_tps(2000.0) # Should be capped at 1000
|
|
assert timing.state.tps == 1000.0
|
|
|
|
timing.set_tps(0.5) # Should be capped at 1.0
|
|
assert timing.state.tps == 1.0
|
|
|
|
def test_set_speed_multiplier(self):
|
|
"""Test speed multiplier setting."""
|
|
timing = TimingController(default_tps=60.0)
|
|
|
|
timing.set_speed_multiplier(2.0)
|
|
assert timing.state.speed_multiplier == 2.0
|
|
assert timing.state.tps == 120.0
|
|
|
|
# Test bounds
|
|
timing.set_speed_multiplier(20.0) # Should be capped at 10.0
|
|
assert timing.state.speed_multiplier == 10.0
|
|
|
|
timing.set_speed_multiplier(0.05) # Should be capped at 0.1
|
|
assert timing.state.speed_multiplier == 0.1
|
|
|
|
def test_pause_toggle(self):
|
|
"""Test pause functionality."""
|
|
timing = TimingController()
|
|
|
|
assert timing.state.is_paused == False
|
|
|
|
timing.toggle_pause()
|
|
assert timing.state.is_paused == True
|
|
|
|
timing.set_pause(False) # This will unpause since it's different
|
|
assert timing.state.is_paused == False
|
|
|
|
timing.set_pause(True) # This will pause since it's different
|
|
assert timing.state.is_paused == True
|
|
|
|
timing.toggle_pause()
|
|
assert timing.state.is_paused == False
|
|
|
|
def test_sprint_mode(self):
|
|
"""Test sprint mode functionality."""
|
|
timing = TimingController()
|
|
|
|
assert timing.state.sprint_mode == False
|
|
|
|
timing.toggle_sprint_mode()
|
|
assert timing.state.sprint_mode == True
|
|
assert timing.sprint_start_time is not None
|
|
|
|
timing.toggle_sprint_mode()
|
|
assert timing.state.sprint_mode == False
|
|
assert timing.sprint_start_time is None
|
|
|
|
def test_should_tick(self):
|
|
"""Test tick timing logic."""
|
|
timing = TimingController(default_tps=60.0)
|
|
|
|
# When paused, should not tick
|
|
timing.state.is_paused = True
|
|
assert timing.should_tick() == False
|
|
|
|
# When in sprint mode, should always tick
|
|
timing.state.is_paused = False
|
|
timing.state.sprint_mode = True
|
|
assert timing.should_tick() == True
|
|
|
|
# Normal mode - should tick based on time
|
|
timing.state.sprint_mode = False
|
|
timing.last_tick_time = time.perf_counter()
|
|
time.sleep(0.02) # 20ms, should be enough for 60 TPS (16.67ms interval)
|
|
assert timing.should_tick() == True
|
|
|
|
def test_update_timing(self):
|
|
"""Test timing update calculations."""
|
|
timing = TimingController(default_tps=60.0)
|
|
|
|
initial_ticks = timing.total_ticks
|
|
timing.update_timing()
|
|
|
|
assert timing.total_ticks == initial_ticks + 1
|
|
assert timing.tick_counter == 1
|
|
|
|
def test_display_functions(self):
|
|
"""Test display utility functions."""
|
|
timing = TimingController(default_tps=60.0)
|
|
|
|
# Test TPS display
|
|
timing.actual_tps = 59.7
|
|
assert timing.get_display_tps() == 60
|
|
|
|
# Test speed display
|
|
assert timing.get_current_speed_display() == "1x"
|
|
|
|
timing.state.is_paused = True
|
|
assert timing.get_current_speed_display() == "Paused"
|
|
|
|
timing.state.is_paused = False
|
|
timing.state.sprint_mode = True
|
|
assert timing.get_current_speed_display() == "Sprint"
|
|
|
|
timing.state.sprint_mode = False
|
|
timing.state.speed_multiplier = 2.0
|
|
assert timing.get_current_speed_display() == "2.0x"
|
|
|
|
|
|
class TestEventBus:
|
|
"""Test event bus functionality."""
|
|
|
|
def test_initialization(self):
|
|
"""Test event bus initialization."""
|
|
bus = EventBus()
|
|
assert len(bus._subscribers) == 0
|
|
assert len(bus._event_history) == 0
|
|
|
|
def test_subscribe_unsubscribe(self):
|
|
"""Test subscribing and unsubscribing to events."""
|
|
bus = EventBus()
|
|
callback = Mock()
|
|
|
|
bus.subscribe(EventType.SIMULATION_STATE_CHANGED, callback)
|
|
assert EventType.SIMULATION_STATE_CHANGED in bus._subscribers
|
|
assert callback in bus._subscribers[EventType.SIMULATION_STATE_CHANGED]
|
|
|
|
bus.unsubscribe(EventType.SIMULATION_STATE_CHANGED, callback)
|
|
assert callback not in bus._subscribers[EventType.SIMULATION_STATE_CHANGED]
|
|
|
|
def test_publish_and_receive(self):
|
|
"""Test publishing and receiving events."""
|
|
bus = EventBus()
|
|
callback = Mock()
|
|
|
|
bus.subscribe(EventType.WORLD_TICK_COMPLETED, callback)
|
|
|
|
event = Event(type=EventType.WORLD_TICK_COMPLETED, data={'tick_count': 100})
|
|
bus.publish(event)
|
|
|
|
callback.assert_called_once_with(event)
|
|
assert len(bus._event_history) == 1
|
|
assert bus._event_history[0] == event
|
|
|
|
def test_multiple_subscribers(self):
|
|
"""Test multiple subscribers to same event."""
|
|
bus = EventBus()
|
|
callback1 = Mock()
|
|
callback2 = Mock()
|
|
|
|
bus.subscribe(EventType.ENTITY_ADDED, callback1)
|
|
bus.subscribe(EventType.ENTITY_ADDED, callback2)
|
|
|
|
event = Event(type=EventType.ENTITY_ADDED, data={'entity_id': 123})
|
|
bus.publish(event)
|
|
|
|
callback1.assert_called_once_with(event)
|
|
callback2.assert_called_once_with(event)
|
|
|
|
def test_event_history(self):
|
|
"""Test event history management."""
|
|
bus = EventBus()
|
|
|
|
# Add some events
|
|
for i in range(5):
|
|
event = Event(type=EventType.TIMING_UPDATE, data={'tick': i})
|
|
bus.publish(event)
|
|
|
|
assert len(bus._event_history) == 5
|
|
|
|
# Test getting recent events
|
|
recent = bus.get_recent_events(count=3)
|
|
assert len(recent) == 3
|
|
assert recent[0].data['tick'] == 2 # Third from last
|
|
assert recent[2].data['tick'] == 4 # Most recent
|
|
|
|
# Test filtered events
|
|
timing_events = bus.get_recent_events(event_type=EventType.TIMING_UPDATE)
|
|
assert len(timing_events) == 5
|
|
|
|
# Clear history
|
|
bus.clear_history()
|
|
assert len(bus._event_history) == 0
|
|
|
|
def test_callback_error_handling(self):
|
|
"""Test that callback errors don't crash the event bus."""
|
|
bus = EventBus()
|
|
|
|
good_callback = Mock()
|
|
|
|
def bad_callback(event):
|
|
raise ValueError("Test error")
|
|
|
|
bus.subscribe(EventType.SIMULATION_STATE_CHANGED, good_callback)
|
|
bus.subscribe(EventType.SIMULATION_STATE_CHANGED, bad_callback)
|
|
|
|
# This should not raise an exception despite the bad callback
|
|
event = Event(type=EventType.SIMULATION_STATE_CHANGED, data={})
|
|
bus.publish(event) # Should not crash
|
|
|
|
# Good callback should still be called
|
|
good_callback.assert_called_once()
|
|
|
|
|
|
class TestSimulationCore:
|
|
"""Test simulation core functionality."""
|
|
|
|
def test_initialization(self):
|
|
"""Test simulation core initialization."""
|
|
config = SimulationConfig(
|
|
grid_width=20,
|
|
grid_height=15,
|
|
initial_cells=5,
|
|
initial_food=10
|
|
)
|
|
core = SimulationCore(config)
|
|
|
|
assert core.config == config
|
|
assert core.world is not None
|
|
assert core.timing is not None
|
|
assert core.event_bus is not None
|
|
assert core.state.total_ticks == 0
|
|
assert core.is_running == False
|
|
|
|
def test_start_stop(self):
|
|
"""Test starting and stopping simulation."""
|
|
config = SimulationConfig(initial_cells=0, initial_food=0)
|
|
core = SimulationCore(config)
|
|
|
|
assert core.is_running == False
|
|
assert core.state.is_running == False
|
|
|
|
core.start()
|
|
assert core.is_running == True
|
|
assert core.state.is_running == True
|
|
|
|
core.stop()
|
|
assert core.is_running == False
|
|
assert core.state.is_running == False
|
|
|
|
def test_pause_resume(self):
|
|
"""Test pause and resume functionality."""
|
|
config = SimulationConfig(initial_cells=0, initial_food=0)
|
|
core = SimulationCore(config)
|
|
|
|
core.start()
|
|
|
|
# Pause
|
|
core.pause()
|
|
assert core.timing.state.is_paused == True
|
|
|
|
# Resume
|
|
core.resume()
|
|
assert core.timing.state.is_paused == False
|
|
|
|
core.stop()
|
|
|
|
def test_step_execution(self):
|
|
"""Test single step execution."""
|
|
config = SimulationConfig(initial_cells=0, initial_food=0)
|
|
core = SimulationCore(config)
|
|
|
|
initial_ticks = core.state.total_ticks
|
|
core.step()
|
|
|
|
# Should have advanced by exactly one tick
|
|
assert core.state.total_ticks == initial_ticks + 1
|
|
|
|
def test_tps_control(self):
|
|
"""Test TPS control functionality."""
|
|
config = SimulationConfig(initial_cells=0, initial_food=0)
|
|
core = SimulationCore(config)
|
|
|
|
core.set_tps(120.0)
|
|
assert core.timing.state.tps == 120.0
|
|
|
|
core.set_speed_multiplier(2.0)
|
|
assert core.timing.state.speed_multiplier == 2.0
|
|
|
|
def test_entity_queries(self):
|
|
"""Test entity query methods."""
|
|
config = SimulationConfig(
|
|
grid_width=10,
|
|
grid_height=10,
|
|
initial_cells=5,
|
|
initial_food=3
|
|
)
|
|
core = SimulationCore(config)
|
|
|
|
# Test entity counting
|
|
from world.objects import DefaultCell, FoodObject
|
|
cell_count = core.count_entities_by_type(DefaultCell)
|
|
food_count = core.count_entities_by_type(FoodObject)
|
|
|
|
assert cell_count == 5
|
|
assert food_count == 3
|
|
|
|
# Test position queries
|
|
from world.world import Position
|
|
position = Position(x=0, y=0)
|
|
entities_in_radius = core.get_entities_in_radius(position, radius=50)
|
|
assert len(entities_in_radius) >= 0 # Should find some entities
|
|
|
|
def test_world_state(self):
|
|
"""Test world state collection."""
|
|
config = SimulationConfig(initial_cells=2, initial_food=3)
|
|
core = SimulationCore(config)
|
|
|
|
state = core.get_world_state()
|
|
|
|
assert 'tick_count' in state
|
|
assert 'actual_tps' in state
|
|
assert 'entity_counts' in state
|
|
assert state['entity_counts']['total'] == 5 # 2 cells + 3 food
|
|
assert state['entity_counts']['cells'] == 2
|
|
assert state['entity_counts']['food'] == 3
|
|
|
|
def test_entity_states(self):
|
|
"""Test entity state collection."""
|
|
config = SimulationConfig(initial_cells=2, initial_food=1)
|
|
core = SimulationCore(config)
|
|
|
|
entities = core.get_entity_states()
|
|
|
|
assert len(entities) == 3 # 2 cells + 1 food
|
|
|
|
# Check cell entities
|
|
cell_entities = [e for e in entities if e['type'] == 'cell']
|
|
assert len(cell_entities) == 2
|
|
|
|
for cell in cell_entities:
|
|
assert 'id' in cell
|
|
assert 'position' in cell
|
|
assert 'energy' in cell
|
|
assert 'age' in cell
|
|
assert 'neural_network' in cell
|
|
|
|
# Check food entities
|
|
food_entities = [e for e in entities if e['type'] == 'food']
|
|
assert len(food_entities) == 1
|
|
|
|
for food in food_entities:
|
|
assert 'id' in food
|
|
assert 'position' in food
|
|
assert 'decay' in food
|
|
|
|
def test_sprint_mode(self):
|
|
"""Test sprint mode functionality."""
|
|
config = SimulationConfig(initial_cells=0, initial_food=0)
|
|
core = SimulationCore(config)
|
|
|
|
assert core.timing.state.sprint_mode == False
|
|
|
|
core.toggle_sprint_mode()
|
|
assert core.timing.state.sprint_mode == True
|
|
|
|
core.toggle_sprint_mode()
|
|
assert core.timing.state.sprint_mode == False |