Add unit tests for configuration and core simulation components
Some checks failed
Build Simulation and Test / Run All Tests (push) Failing after 1m46s

This commit is contained in:
Sam 2025-11-08 19:45:35 -06:00
parent 3a34759094
commit 2daf5f7b19
4 changed files with 914 additions and 0 deletions

111
tests/test_config.py Normal file
View File

@ -0,0 +1,111 @@
"""Tests for configuration system."""
import pytest
import json
import tempfile
import os
from config.simulation_config import SimulationConfig, HeadlessConfig, InteractiveConfig, ExperimentConfig
from config.config_loader import ConfigLoader
class TestSimulationConfig:
"""Test simulation configuration."""
def test_custom_values(self):
"""Test custom configuration values."""
config = SimulationConfig(
grid_width=50,
grid_height=40,
cell_size=15,
initial_cells=100,
initial_food=200,
food_spawning=False,
random_seed=999,
default_tps=120.0
)
assert config.grid_width == 50
assert config.grid_height == 40
assert config.cell_size == 15
assert config.initial_cells == 100
assert config.initial_food == 200
assert config.food_spawning == False
assert config.random_seed == 999
assert config.default_tps == 120.0
class TestConfigLoader:
"""Test configuration loader."""
def test_load_headless_config(self):
"""Test loading headless configuration from file."""
# Create a temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
temp_file = f.name
json.dump({
"max_ticks": 10000,
"max_duration": 300.0,
"output": {
"enabled": True,
"directory": "test_output",
"formats": ["json", "csv"]
},
"simulation": {
"grid_width": 50,
"grid_height": 40,
"initial_cells": 100,
"default_tps": 60.0
}
}, f)
try:
config = ConfigLoader.load_headless_config(temp_file)
assert config.max_ticks == 10000
assert config.max_duration == 300.0
assert config.output.enabled == True
assert config.output.directory == "test_output"
assert config.output.formats == ["json", "csv"]
assert config.simulation.grid_width == 50
assert config.simulation.grid_height == 40
assert config.simulation.initial_cells == 100
assert config.simulation.default_tps == 60.0
finally:
os.unlink(temp_file)
def test_save_load_json(self):
"""Test saving and loading configuration from JSON file."""
# Create a temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
temp_file = f.name
try:
# Create test configuration
original_config = HeadlessConfig(
max_ticks=5000,
simulation=SimulationConfig(
grid_width=25,
grid_height=20,
initial_cells=75,
default_tps=90.0
)
)
# Save to JSON
ConfigLoader.save_config(original_config, temp_file)
# Load from JSON
loaded_config = ConfigLoader.load_headless_config(temp_file)
# Verify loaded config matches original
assert loaded_config.max_ticks == original_config.max_ticks
assert loaded_config.simulation.grid_width == original_config.simulation.grid_width
assert loaded_config.simulation.grid_height == original_config.simulation.grid_height
assert loaded_config.simulation.initial_cells == original_config.simulation.initial_cells
assert loaded_config.simulation.default_tps == original_config.simulation.default_tps
finally:
# Clean up temporary file
if os.path.exists(temp_file):
os.unlink(temp_file)

View File

@ -0,0 +1,390 @@
"""Tests for core simulation components."""
import pytest
import time
from unittest.mock import Mock
from core.simulation_core import SimulationCore, SimulationState
from core.timing import TimingController, TimingState
from core.event_bus import EventBus, EventType, Event
from config.simulation_config import SimulationConfig
class TestTimingController:
"""Test timing controller functionality."""
def test_initialization(self):
"""Test timing controller initialization."""
event_bus = Mock()
timing = TimingController(default_tps=60.0, event_bus=event_bus)
assert timing.state.tps == 60.0
assert timing.state.is_paused == False
assert timing.state.sprint_mode == False
assert timing.state.speed_multiplier == 1.0
assert timing.default_tps == 60.0
assert timing.total_ticks == 0
def test_set_tps(self):
"""Test TPS setting."""
timing = TimingController(default_tps=60.0)
timing.set_tps(120.0)
assert timing.state.tps == 120.0
assert timing.state.speed_multiplier == 2.0
# Test bounds
timing.set_tps(2000.0) # Should be capped at 1000
assert timing.state.tps == 1000.0
timing.set_tps(0.5) # Should be capped at 1.0
assert timing.state.tps == 1.0
def test_set_speed_multiplier(self):
"""Test speed multiplier setting."""
timing = TimingController(default_tps=60.0)
timing.set_speed_multiplier(2.0)
assert timing.state.speed_multiplier == 2.0
assert timing.state.tps == 120.0
# Test bounds
timing.set_speed_multiplier(20.0) # Should be capped at 10.0
assert timing.state.speed_multiplier == 10.0
timing.set_speed_multiplier(0.05) # Should be capped at 0.1
assert timing.state.speed_multiplier == 0.1
def test_pause_toggle(self):
"""Test pause functionality."""
timing = TimingController()
assert timing.state.is_paused == False
timing.toggle_pause()
assert timing.state.is_paused == True
timing.set_pause(False) # This will unpause since it's different
assert timing.state.is_paused == False
timing.set_pause(True) # This will pause since it's different
assert timing.state.is_paused == True
timing.toggle_pause()
assert timing.state.is_paused == False
def test_sprint_mode(self):
"""Test sprint mode functionality."""
timing = TimingController()
assert timing.state.sprint_mode == False
timing.toggle_sprint_mode()
assert timing.state.sprint_mode == True
assert timing.sprint_start_time is not None
timing.toggle_sprint_mode()
assert timing.state.sprint_mode == False
assert timing.sprint_start_time is None
def test_should_tick(self):
"""Test tick timing logic."""
timing = TimingController(default_tps=60.0)
# When paused, should not tick
timing.state.is_paused = True
assert timing.should_tick() == False
# When in sprint mode, should always tick
timing.state.is_paused = False
timing.state.sprint_mode = True
assert timing.should_tick() == True
# Normal mode - should tick based on time
timing.state.sprint_mode = False
timing.last_tick_time = time.perf_counter()
time.sleep(0.02) # 20ms, should be enough for 60 TPS (16.67ms interval)
assert timing.should_tick() == True
def test_update_timing(self):
"""Test timing update calculations."""
timing = TimingController(default_tps=60.0)
initial_ticks = timing.total_ticks
timing.update_timing()
assert timing.total_ticks == initial_ticks + 1
assert timing.tick_counter == 1
def test_display_functions(self):
"""Test display utility functions."""
timing = TimingController(default_tps=60.0)
# Test TPS display
timing.actual_tps = 59.7
assert timing.get_display_tps() == 60
# Test speed display
assert timing.get_current_speed_display() == "1x"
timing.state.is_paused = True
assert timing.get_current_speed_display() == "Paused"
timing.state.is_paused = False
timing.state.sprint_mode = True
assert timing.get_current_speed_display() == "Sprint"
timing.state.sprint_mode = False
timing.state.speed_multiplier = 2.0
assert timing.get_current_speed_display() == "2.0x"
class TestEventBus:
"""Test event bus functionality."""
def test_initialization(self):
"""Test event bus initialization."""
bus = EventBus()
assert len(bus._subscribers) == 0
assert len(bus._event_history) == 0
def test_subscribe_unsubscribe(self):
"""Test subscribing and unsubscribing to events."""
bus = EventBus()
callback = Mock()
bus.subscribe(EventType.SIMULATION_STATE_CHANGED, callback)
assert EventType.SIMULATION_STATE_CHANGED in bus._subscribers
assert callback in bus._subscribers[EventType.SIMULATION_STATE_CHANGED]
bus.unsubscribe(EventType.SIMULATION_STATE_CHANGED, callback)
assert callback not in bus._subscribers[EventType.SIMULATION_STATE_CHANGED]
def test_publish_and_receive(self):
"""Test publishing and receiving events."""
bus = EventBus()
callback = Mock()
bus.subscribe(EventType.WORLD_TICK_COMPLETED, callback)
event = Event(type=EventType.WORLD_TICK_COMPLETED, data={'tick_count': 100})
bus.publish(event)
callback.assert_called_once_with(event)
assert len(bus._event_history) == 1
assert bus._event_history[0] == event
def test_multiple_subscribers(self):
"""Test multiple subscribers to same event."""
bus = EventBus()
callback1 = Mock()
callback2 = Mock()
bus.subscribe(EventType.ENTITY_ADDED, callback1)
bus.subscribe(EventType.ENTITY_ADDED, callback2)
event = Event(type=EventType.ENTITY_ADDED, data={'entity_id': 123})
bus.publish(event)
callback1.assert_called_once_with(event)
callback2.assert_called_once_with(event)
def test_event_history(self):
"""Test event history management."""
bus = EventBus()
# Add some events
for i in range(5):
event = Event(type=EventType.TIMING_UPDATE, data={'tick': i})
bus.publish(event)
assert len(bus._event_history) == 5
# Test getting recent events
recent = bus.get_recent_events(count=3)
assert len(recent) == 3
assert recent[0].data['tick'] == 2 # Third from last
assert recent[2].data['tick'] == 4 # Most recent
# Test filtered events
timing_events = bus.get_recent_events(event_type=EventType.TIMING_UPDATE)
assert len(timing_events) == 5
# Clear history
bus.clear_history()
assert len(bus._event_history) == 0
def test_callback_error_handling(self):
"""Test that callback errors don't crash the event bus."""
bus = EventBus()
good_callback = Mock()
def bad_callback(event):
raise ValueError("Test error")
bus.subscribe(EventType.SIMULATION_STATE_CHANGED, good_callback)
bus.subscribe(EventType.SIMULATION_STATE_CHANGED, bad_callback)
# This should not raise an exception despite the bad callback
event = Event(type=EventType.SIMULATION_STATE_CHANGED, data={})
bus.publish(event) # Should not crash
# Good callback should still be called
good_callback.assert_called_once()
class TestSimulationCore:
"""Test simulation core functionality."""
def test_initialization(self):
"""Test simulation core initialization."""
config = SimulationConfig(
grid_width=20,
grid_height=15,
initial_cells=5,
initial_food=10
)
core = SimulationCore(config)
assert core.config == config
assert core.world is not None
assert core.timing is not None
assert core.event_bus is not None
assert core.state.total_ticks == 0
assert core.is_running == False
def test_start_stop(self):
"""Test starting and stopping simulation."""
config = SimulationConfig(initial_cells=0, initial_food=0)
core = SimulationCore(config)
assert core.is_running == False
assert core.state.is_running == False
core.start()
assert core.is_running == True
assert core.state.is_running == True
core.stop()
assert core.is_running == False
assert core.state.is_running == False
def test_pause_resume(self):
"""Test pause and resume functionality."""
config = SimulationConfig(initial_cells=0, initial_food=0)
core = SimulationCore(config)
core.start()
# Pause
core.pause()
assert core.timing.state.is_paused == True
# Resume
core.resume()
assert core.timing.state.is_paused == False
core.stop()
def test_step_execution(self):
"""Test single step execution."""
config = SimulationConfig(initial_cells=0, initial_food=0)
core = SimulationCore(config)
initial_ticks = core.state.total_ticks
core.step()
# Should have advanced by exactly one tick
assert core.state.total_ticks == initial_ticks + 1
def test_tps_control(self):
"""Test TPS control functionality."""
config = SimulationConfig(initial_cells=0, initial_food=0)
core = SimulationCore(config)
core.set_tps(120.0)
assert core.timing.state.tps == 120.0
core.set_speed_multiplier(2.0)
assert core.timing.state.speed_multiplier == 2.0
def test_entity_queries(self):
"""Test entity query methods."""
config = SimulationConfig(
grid_width=10,
grid_height=10,
initial_cells=5,
initial_food=3
)
core = SimulationCore(config)
# Test entity counting
from world.objects import DefaultCell, FoodObject
cell_count = core.count_entities_by_type(DefaultCell)
food_count = core.count_entities_by_type(FoodObject)
assert cell_count == 5
assert food_count == 3
# Test position queries
from world.world import Position
position = Position(x=0, y=0)
entities_in_radius = core.get_entities_in_radius(position, radius=50)
assert len(entities_in_radius) >= 0 # Should find some entities
def test_world_state(self):
"""Test world state collection."""
config = SimulationConfig(initial_cells=2, initial_food=3)
core = SimulationCore(config)
state = core.get_world_state()
assert 'tick_count' in state
assert 'actual_tps' in state
assert 'entity_counts' in state
assert state['entity_counts']['total'] == 5 # 2 cells + 3 food
assert state['entity_counts']['cells'] == 2
assert state['entity_counts']['food'] == 3
def test_entity_states(self):
"""Test entity state collection."""
config = SimulationConfig(initial_cells=2, initial_food=1)
core = SimulationCore(config)
entities = core.get_entity_states()
assert len(entities) == 3 # 2 cells + 1 food
# Check cell entities
cell_entities = [e for e in entities if e['type'] == 'cell']
assert len(cell_entities) == 2
for cell in cell_entities:
assert 'id' in cell
assert 'position' in cell
assert 'energy' in cell
assert 'age' in cell
assert 'neural_network' in cell
# Check food entities
food_entities = [e for e in entities if e['type'] == 'food']
assert len(food_entities) == 1
for food in food_entities:
assert 'id' in food
assert 'position' in food
assert 'decay' in food
def test_sprint_mode(self):
"""Test sprint mode functionality."""
config = SimulationConfig(initial_cells=0, initial_food=0)
core = SimulationCore(config)
assert core.timing.state.sprint_mode == False
core.toggle_sprint_mode()
assert core.timing.state.sprint_mode == True
core.toggle_sprint_mode()
assert core.timing.state.sprint_mode == False

View File

@ -0,0 +1,218 @@
"""Tests for headless simulation engine."""
import time
from unittest.mock import Mock, patch
from engines.headless_engine import HeadlessSimulationEngine, HeadlessConfig
from config.simulation_config import SimulationConfig
class TestHeadlessConfig:
"""Test headless configuration."""
def test_custom_values(self):
"""Test custom configuration values."""
sim_config = SimulationConfig(initial_cells=50, default_tps=120.0)
config = HeadlessConfig(
simulation=sim_config,
max_ticks=10000,
max_duration=300.0,
output_dir="custom_output",
enable_metrics=False,
enable_entities=True,
enable_evolution=False,
metrics_interval=50,
entities_interval=500,
evolution_interval=2000,
output_formats=['json', 'csv'],
real_time=True
)
assert config.simulation == sim_config
assert config.max_ticks == 10000
assert config.max_duration == 300.0
assert config.output_dir == "custom_output"
assert config.enable_metrics == False
assert config.enable_entities == True
assert config.enable_evolution == False
assert config.metrics_interval == 50
assert config.entities_interval == 500
assert config.evolution_interval == 2000
assert config.output_formats == ['json', 'csv']
assert config.real_time == True
class TestHeadlessSimulationEngine:
"""Test headless simulation engine."""
def test_initialization(self):
"""Test engine initialization."""
sim_config = SimulationConfig(initial_cells=5, initial_food=10)
config = HeadlessConfig(
simulation=sim_config,
max_ticks=1000,
output_formats=['json']
)
engine = HeadlessSimulationEngine(config)
assert engine.config == config
assert engine.event_bus is not None
assert engine.simulation_core is not None
assert engine.file_writer is not None
assert engine.formatters is not None
assert engine.collectors is not None
assert engine.running == False
assert engine.start_time is None
def test_collectors_creation(self):
"""Test collectors are created based on configuration."""
sim_config = SimulationConfig()
config = HeadlessConfig(
simulation=sim_config,
enable_metrics=True,
enable_entities=True,
enable_evolution=False,
metrics_interval=50,
entities_interval=200
)
engine = HeadlessSimulationEngine(config)
assert 'metrics' in engine.collectors
assert 'entities' in engine.collectors
assert 'evolution' not in engine.collectors
assert engine.collectors['metrics'].collection_interval == 50
assert engine.collectors['entities'].collection_interval == 200
def test_formatters_creation(self):
"""Test formatters are created based on configuration."""
sim_config = SimulationConfig()
config = HeadlessConfig(
simulation=sim_config,
output_formats=['json', 'csv']
)
engine = HeadlessSimulationEngine(config)
assert 'json' in engine.formatters
assert 'csv' in engine.formatters
def test_should_terminate_max_ticks(self):
"""Test termination condition for max ticks."""
sim_config = SimulationConfig()
config = HeadlessConfig(simulation=sim_config, max_ticks=100)
engine = HeadlessSimulationEngine(config)
engine.running = True
engine.start_time = time.time()
# Mock the simulation core to report tick count
engine.simulation_core.state.total_ticks = 99
assert engine._should_terminate() == False
engine.simulation_core.state.total_ticks = 100
assert engine._should_terminate() == True
def test_should_terminate_max_duration(self):
"""Test termination condition for max duration."""
sim_config = SimulationConfig()
config = HeadlessConfig(simulation=sim_config, max_duration=1.0)
engine = HeadlessSimulationEngine(config)
engine.running = True
engine.start_time = time.time()
# Should not terminate immediately
assert engine._should_terminate() == False
# Mock time passage
with patch('time.time', return_value=engine.start_time + 1.5):
assert engine._should_terminate() == True
def test_should_terminate_no_limits(self):
"""Test no termination conditions."""
sim_config = SimulationConfig()
config = HeadlessConfig(simulation=sim_config)
engine = HeadlessSimulationEngine(config)
engine.running = True
engine.start_time = time.time()
# Should never terminate without limits
assert engine._should_terminate() == False
def test_collect_data(self):
"""Test data collection from collectors."""
sim_config = SimulationConfig()
config = HeadlessConfig(
simulation=sim_config,
enable_metrics=True,
enable_entities=False,
enable_evolution=False
)
engine = HeadlessSimulationEngine(config)
# Mock simulation core's get_world_state method
engine.simulation_core.get_world_state = Mock(return_value={
'tick_count': 1000,
'actual_tps': 60.0,
'entity_counts': {'total': 25}
})
# Mock collector
mock_collector = Mock()
mock_collector.update.return_value = [
{'tick_count': 1000, 'actual_tps': 60.0, 'collection_type': 'metrics'}
]
engine.collectors['metrics'] = mock_collector
engine._collect_data()
# Verify collector was called
mock_collector.update.assert_called_once()
# Verify data was collected
assert len(engine.batch_data['metrics']) == 1
assert engine.batch_data['metrics'][0]['tick_count'] == 1000
def test_get_real_time_status(self):
"""Test real-time status reporting."""
sim_config = SimulationConfig()
config = HeadlessConfig(simulation=sim_config)
engine = HeadlessSimulationEngine(config)
engine.running = True
engine.start_time = time.time() - 5.0
# Mock simulation state
engine.simulation_core.state.total_ticks = 300
engine.simulation_core.state.actual_tps = 60.0
engine.simulation_core.get_world_state = Mock(return_value={
'tick_count': 300,
'actual_tps': 60.0,
'entity_counts': {'total': 50}
})
status = engine.get_real_time_status()
assert status['running'] == True
assert status['ticks'] == 300
assert status['tps'] == 60.0
assert status['duration'] > 4.0 # Approximately 5 seconds
assert status['world_state']['tick_count'] == 300
def test_signal_handler(self):
"""Test signal handling for graceful shutdown."""
sim_config = SimulationConfig()
config = HeadlessConfig(simulation=sim_config)
engine = HeadlessSimulationEngine(config)
engine.running = True
# Simulate signal handler
engine._signal_handler(2, None) # SIGINT
assert engine.running == False

195
tests/test_output_system.py Normal file
View File

@ -0,0 +1,195 @@
"""Tests for output collection system."""
import json
import tempfile
import os
from unittest.mock import Mock
from output.collectors.metrics_collector import MetricsCollector
from output.collectors.entity_collector import EntityCollector
from output.formatters.json_formatter import JSONFormatter
from output.formatters.csv_formatter import CSVFormatter
from output.writers.file_writer import FileWriter
class TestMetricsCollector:
"""Test metrics collector functionality."""
def test_collect_data(self):
"""Test metrics data collection."""
collector = MetricsCollector(collection_interval=100)
# Create mock simulation core
mock_sim_core = Mock()
mock_sim_core.get_world_state.return_value = {
'tick_count': 1500,
'actual_tps': 58.5,
'target_tps': 60.0,
'speed_multiplier': 1.0,
'is_paused': False,
'sprint_mode': False,
'world_buffer': 1,
'entity_counts': {
'total': 25,
'cells': 20,
'food': 5
}
}
# Mock timing
mock_sim_core.timing = Mock()
mock_sim_core.timing.last_tick_time = 1234567890.5
data = collector.collect(mock_sim_core)
assert data['tick_count'] == 1500
assert data['actual_tps'] == 58.5
assert data['target_tps'] == 60.0
assert data['speed_multiplier'] == 1.0
assert data['is_paused'] == False
assert data['sprint_mode'] == False
assert data['world_buffer'] == 1
assert data['entity_counts'] == {
'total': 25,
'cells': 20,
'food': 5
}
assert data['collection_type'] == 'metrics'
assert data['timestamp'] == 1234567890.5
class TestEntityCollector:
"""Test entity collector functionality."""
def test_collect_cells_only(self):
"""Test collecting only cell entities."""
collector = EntityCollector(
collection_interval=1000,
include_cells=True,
include_food=False
)
# Create mock simulation core with entity states
mock_sim_core = Mock()
mock_sim_core.get_world_state.return_value = {
'tick_count': 1000
}
mock_sim_core.get_entity_states.return_value = [
{
'id': 1,
'type': 'cell',
'position': {'x': 10, 'y': 20},
'energy': 75.5,
'age': 150,
'generation': 3,
'neural_network': {'layer_sizes': [4, 6, 2]}
},
{
'id': 2,
'type': 'food',
'position': {'x': 30, 'y': 40},
'decay': 50,
'max_decay': 100
}
]
data = collector.collect(mock_sim_core)
assert len(data['entities']) == 1 # Only cell included
assert data['entities'][0]['type'] == 'cell'
assert data['entities'][0]['id'] == 1
assert data['collection_type'] == 'entities'
class TestJSONFormatter:
"""Test JSON formatter functionality."""
def test_format_data(self):
"""Test JSON data formatting."""
formatter = JSONFormatter()
test_data = {
'tick_count': 1000,
'actual_tps': 58.5,
'entity_counts': {
'cells': 20,
'food': 5
}
}
formatted = formatter.format(test_data)
assert isinstance(formatted, str)
# Verify it's valid JSON
parsed = json.loads(formatted)
assert parsed['tick_count'] == 1000
assert parsed['actual_tps'] == 58.5
assert parsed['entity_counts']['cells'] == 20
class TestCSVFormatter:
"""Test CSV formatter functionality."""
def test_format_simple_data(self):
"""Test CSV formatting for simple data."""
formatter = CSVFormatter()
test_data = {
'tick_count': 1000,
'actual_tps': 58.5,
'is_paused': False
}
formatted = formatter.format(test_data)
assert isinstance(formatted, str)
lines = formatted.strip().split('\n')
# Should have header and one data row
assert len(lines) == 2
# Check header
header = lines[0]
assert 'tick_count' in header
assert 'actual_tps' in header
assert 'is_paused' in header
# Check data row
data_row = lines[1]
assert '1000' in data_row
assert '58.5' in data_row
def test_get_file_extension(self):
"""Test file extension."""
formatter = CSVFormatter()
assert formatter.get_file_extension() == 'csv'
class TestFileWriter:
"""Test file writer functionality."""
def test_write_data(self):
"""Test writing data to file."""
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
writer = FileWriter(temp_dir)
# Should be ready now
assert writer.is_ready() == True
# Write test data
test_data = '{"tick_count": 1000, "actual_tps": 58.5}'
filename = "test_data.json"
writer.write(test_data, filename)
# Verify file was created
file_path = os.path.join(temp_dir, filename)
assert os.path.exists(file_path)
# Verify file content
with open(file_path, 'r') as f:
content = f.read()
assert content == test_data