DynamicAbstractionSystem/engines/headless_engine.py
Sam 3a34759094
Some checks failed
Build Simulation and Test / Run All Tests (push) Failing after 8m17s
Add core simulation components and configuration classes
Major rewrite.
2025-11-08 19:17:40 -06:00

274 lines
10 KiB
Python

"""Headless simulation engine for running simulations without UI."""
import time
import signal
import sys
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from core.simulation_core import SimulationCore, SimulationConfig
from core.event_bus import EventBus
from output import MetricsCollector, EntityCollector, EvolutionCollector
from output.formatters.json_formatter import JSONFormatter
from output.formatters.csv_formatter import CSVFormatter
from output.writers.file_writer import FileWriter
@dataclass
class HeadlessConfig:
"""Configuration for headless simulation engine."""
simulation: SimulationConfig
max_ticks: Optional[int] = None
max_duration: Optional[float] = None # seconds
output_dir: str = "simulation_output"
enable_metrics: bool = True
enable_entities: bool = True
enable_evolution: bool = True
metrics_interval: int = 100
entities_interval: int = 1000
evolution_interval: int = 1000
output_formats: List[str] = None # ['json', 'csv']
real_time: bool = False # Whether to run in real-time or as fast as possible
def __post_init__(self):
if self.output_formats is None:
self.output_formats = ['json']
class HeadlessSimulationEngine:
"""Headless simulation engine with data collection capabilities."""
def __init__(self, config: HeadlessConfig):
self.config = config
self.event_bus = EventBus()
self.simulation_core = SimulationCore(config.simulation, self.event_bus)
self.file_writer = FileWriter(config.output_dir)
self.formatters = self._create_formatters()
self.collectors = self._create_collectors()
# Runtime state
self.running = False
self.start_time = None
self.tick_data = {}
self.batch_data = {
'metrics': [],
'entities': [],
'evolution': []
}
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _create_formatters(self) -> Dict[str, Any]:
"""Create data formatters."""
formatters = {}
if 'json' in self.config.output_formats:
formatters['json'] = JSONFormatter()
if 'csv' in self.config.output_formats:
formatters['csv'] = CSVFormatter()
return formatters
def _create_collectors(self) -> Dict[str, Any]:
"""Create data collectors."""
collectors = {}
if self.config.enable_metrics:
collectors['metrics'] = MetricsCollector(self.config.metrics_interval)
if self.config.enable_entities:
collectors['entities'] = EntityCollector(
self.config.entities_interval,
include_cells=True,
include_food=False
)
if self.config.enable_evolution:
collectors['evolution'] = EvolutionCollector(self.config.evolution_interval)
return collectors
def run(self) -> Dict[str, Any]:
"""Run the headless simulation."""
# Determine if we should run at max speed
max_speed_mode = not self.config.real_time and self.config.simulation.default_tps >= 1000
print(f"Starting headless simulation...")
print(f"Output directory: {self.config.output_dir}")
print(f"Max ticks: {self.config.max_ticks or 'unlimited'}")
print(f"Max duration: {self.config.max_duration or 'unlimited'} seconds")
print(f"Real-time mode: {self.config.real_time}")
print(f"Speed mode: {'Maximum speed' if max_speed_mode else f'{self.config.simulation.default_tps} TPS'}")
print(f"Output formats: {', '.join(self.config.output_formats)}")
print(f"Collectors: {', '.join(self.collectors.keys())}")
print()
self.running = True
self.start_time = time.time()
self.simulation_core.start()
# Enable sprint mode for maximum speed if not real-time mode
if max_speed_mode:
self.simulation_core.timing.set_sprint_mode(True)
print("Running at maximum speed (sprint mode enabled)")
last_batch_time = time.time()
batch_interval = 5.0 # Write batch data every 5 seconds
try:
while self.running:
# Check termination conditions
if self._should_terminate():
break
# Update simulation
if max_speed_mode:
# In max speed mode, update as fast as possible
self.simulation_core.update(0.0)
else:
# Normal timing-based updates
self.simulation_core.update(0.016) # ~60 FPS equivalent
# Collect data
self._collect_data()
# Write batch data periodically
if time.time() - last_batch_time >= batch_interval:
self._write_batch_data()
last_batch_time = time.time()
# Real-time delay if needed
if self.config.real_time:
time.sleep(0.016) # ~60 FPS
except KeyboardInterrupt:
print("\nSimulation interrupted by user")
except Exception as e:
print(f"Simulation error: {e}")
import traceback
traceback.print_exc()
finally:
self._finalize()
return self._get_summary()
def _should_terminate(self) -> bool:
"""Check if simulation should terminate."""
# Check max ticks
if self.config.max_ticks and self.simulation_core.state.total_ticks >= self.config.max_ticks:
print(f"Reached max ticks: {self.config.max_ticks}")
return True
# Check max duration
if self.config.max_duration:
elapsed = time.time() - self.start_time
if elapsed >= self.config.max_duration:
print(f"Reached max duration: {self.config.max_duration} seconds")
return True
return False
def _collect_data(self):
"""Collect data from all collectors."""
for collector_name, collector in self.collectors.items():
data_list = collector.update(self.simulation_core)
for data in data_list:
self.batch_data[collector_name].append(data)
def _write_batch_data(self):
"""Write collected data to files."""
if not self.file_writer.is_ready():
return
for collector_name, data_list in self.batch_data.items():
if not data_list:
continue
for format_name, formatter in self.formatters.items():
# Group data by tick and write one file per tick
data_by_tick = {}
for data in data_list:
tick = data.get('tick_count', data.get('tick', self.simulation_core.state.total_ticks))
if tick not in data_by_tick:
data_by_tick[tick] = []
data_by_tick[tick].append(data)
# Write one file per tick for this collector
for tick, tick_data in data_by_tick.items():
filename = f"{collector_name}_tick{tick}.{formatter.get_file_extension()}"
# If multiple data items for same tick, combine them or write the latest one
combined_data = tick_data[-1] if len(tick_data) == 1 else {
'timestamp': tick_data[0].get('timestamp'),
'tick_count': tick,
'collection_type': collector_name,
'multiple_entries': len(tick_data),
'data': tick_data
}
formatted_data = formatter.format(combined_data)
self.file_writer.write(formatted_data, filename)
# Clear written data
data_list.clear()
print(f"Wrote batch data at tick {self.simulation_core.state.total_ticks}")
def _finalize(self):
"""Finalize simulation and write remaining data."""
print("Finalizing simulation...")
# Write any remaining data
self._write_batch_data()
# Write final summary
summary = self._get_summary()
if 'json' in self.formatters:
summary_data = self.formatters['json'].format(summary)
self.file_writer.write(summary_data, "simulation_summary.json")
# Stop simulation
self.simulation_core.stop()
self.file_writer.close()
print("Simulation completed")
def _get_summary(self) -> Dict[str, Any]:
"""Get simulation summary."""
duration = time.time() - self.start_time if self.start_time else 0
world_state = self.simulation_core.get_world_state()
return {
'simulation_config': {
'grid_width': self.config.simulation.grid_width,
'grid_height': self.config.simulation.grid_height,
'initial_cells': self.config.simulation.initial_cells,
'initial_food': self.config.simulation.initial_food,
'default_tps': self.config.simulation.default_tps
},
'runtime': {
'duration_seconds': duration,
'total_ticks': self.simulation_core.state.total_ticks,
'average_tps': self.simulation_core.state.total_ticks / duration if duration > 0 else 0,
'final_actual_tps': self.simulation_core.state.actual_tps
},
'final_state': world_state,
'data_collection': {
'collectors_used': list(self.collectors.keys()),
'output_formats': self.config.output_formats,
'output_directory': self.config.output_dir
}
}
def _signal_handler(self, signum, frame):
"""Handle shutdown signals."""
print(f"\nReceived signal {signum}, shutting down gracefully...")
self.running = False
def get_real_time_status(self) -> Dict[str, Any]:
"""Get current simulation status (useful for monitoring)."""
return {
'running': self.running,
'ticks': self.simulation_core.state.total_ticks,
'tps': self.simulation_core.state.actual_tps,
'duration': time.time() - self.start_time if self.start_time else 0,
'world_state': self.simulation_core.get_world_state()
}