"""Headless simulation engine for running simulations without UI.""" import time import signal import sys from typing import Dict, Any, Optional, List from dataclasses import dataclass from core.simulation_core import SimulationCore, SimulationConfig from core.event_bus import EventBus from output import MetricsCollector, EntityCollector, EvolutionCollector from output.formatters.json_formatter import JSONFormatter from output.formatters.csv_formatter import CSVFormatter from output.writers.file_writer import FileWriter @dataclass class HeadlessConfig: """Configuration for headless simulation engine.""" simulation: SimulationConfig max_ticks: Optional[int] = None max_duration: Optional[float] = None # seconds output_dir: str = "simulation_output" enable_metrics: bool = True enable_entities: bool = True enable_evolution: bool = True metrics_interval: int = 100 entities_interval: int = 1000 evolution_interval: int = 1000 output_formats: List[str] = None # ['json', 'csv'] real_time: bool = False # Whether to run in real-time or as fast as possible def __post_init__(self): if self.output_formats is None: self.output_formats = ['json'] class HeadlessSimulationEngine: """Headless simulation engine with data collection capabilities.""" def __init__(self, config: HeadlessConfig): self.config = config self.event_bus = EventBus() self.simulation_core = SimulationCore(config.simulation, self.event_bus) self.file_writer = FileWriter(config.output_dir) self.formatters = self._create_formatters() self.collectors = self._create_collectors() # Runtime state self.running = False self.start_time = None self.tick_data = {} self.batch_data = { 'metrics': [], 'entities': [], 'evolution': [] } # Setup signal handlers for graceful shutdown signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _create_formatters(self) -> Dict[str, Any]: """Create data formatters.""" formatters = {} if 'json' in self.config.output_formats: formatters['json'] = JSONFormatter() if 'csv' in self.config.output_formats: formatters['csv'] = CSVFormatter() return formatters def _create_collectors(self) -> Dict[str, Any]: """Create data collectors.""" collectors = {} if self.config.enable_metrics: collectors['metrics'] = MetricsCollector(self.config.metrics_interval) if self.config.enable_entities: collectors['entities'] = EntityCollector( self.config.entities_interval, include_cells=True, include_food=False ) if self.config.enable_evolution: collectors['evolution'] = EvolutionCollector(self.config.evolution_interval) return collectors def run(self) -> Dict[str, Any]: """Run the headless simulation.""" # Determine if we should run at max speed max_speed_mode = not self.config.real_time and self.config.simulation.default_tps >= 1000 print(f"Starting headless simulation...") print(f"Output directory: {self.config.output_dir}") print(f"Max ticks: {self.config.max_ticks or 'unlimited'}") print(f"Max duration: {self.config.max_duration or 'unlimited'} seconds") print(f"Real-time mode: {self.config.real_time}") print(f"Speed mode: {'Maximum speed' if max_speed_mode else f'{self.config.simulation.default_tps} TPS'}") print(f"Output formats: {', '.join(self.config.output_formats)}") print(f"Collectors: {', '.join(self.collectors.keys())}") print() self.running = True self.start_time = time.time() self.simulation_core.start() # Enable sprint mode for maximum speed if not real-time mode if max_speed_mode: self.simulation_core.timing.set_sprint_mode(True) print("Running at maximum speed (sprint mode enabled)") last_batch_time = time.time() batch_interval = 5.0 try: while self.running: # Check termination conditions if self._should_terminate(): break # Update simulation if max_speed_mode: # In max speed mode, update as fast as possible self.simulation_core.update(0.0) else: # Normal timing-based updates self.simulation_core.update(0.016) # ~60 FPS equivalent # Collect data self._collect_data() # Write batch data periodically if time.time() - last_batch_time >= batch_interval: self._write_batch_data() last_batch_time = time.time() # Real-time delay if needed if self.config.real_time: time.sleep(0.016) # ~60 FPS except KeyboardInterrupt: print("\nSimulation interrupted by user") except Exception as e: print(f"Simulation error: {e}") import traceback traceback.print_exc() finally: self._finalize() return self._get_summary() def _should_terminate(self) -> bool: """Check if simulation should terminate.""" # Check max ticks if self.config.max_ticks and self.simulation_core.state.total_ticks >= self.config.max_ticks: print(f"Reached max ticks: {self.config.max_ticks}") return True # Check max duration if self.config.max_duration: elapsed = time.time() - self.start_time if elapsed >= self.config.max_duration: print(f"Reached max duration: {self.config.max_duration} seconds") return True return False def _collect_data(self): """Collect data from all collectors.""" for collector_name, collector in self.collectors.items(): data_list = collector.update(self.simulation_core) for data in data_list: self.batch_data[collector_name].append(data) def _write_batch_data(self): """Write collected data to files.""" if not self.file_writer.is_ready(): return for collector_name, data_list in self.batch_data.items(): if not data_list: continue for format_name, formatter in self.formatters.items(): # Group data by tick and write one file per tick data_by_tick = {} for data in data_list: tick = data.get('tick_count', data.get('tick', self.simulation_core.state.total_ticks)) if tick not in data_by_tick: data_by_tick[tick] = [] data_by_tick[tick].append(data) # Write one file per tick for this collector for tick, tick_data in data_by_tick.items(): filename = f"{collector_name}_tick{tick}.{formatter.get_file_extension()}" # If multiple data items for same tick, combine them or write the latest one combined_data = tick_data[-1] if len(tick_data) == 1 else { 'timestamp': tick_data[0].get('timestamp'), 'tick_count': tick, 'collection_type': collector_name, 'multiple_entries': len(tick_data), 'data': tick_data } formatted_data = formatter.format(combined_data) self.file_writer.write(formatted_data, filename) # Clear written data data_list.clear() print(f"Wrote batch data at tick {self.simulation_core.state.total_ticks}") def _finalize(self): """Finalize simulation and write remaining data.""" print("Finalizing simulation...") # Write any remaining data self._write_batch_data() # Write final summary summary = self._get_summary() if 'json' in self.formatters: summary_data = self.formatters['json'].format(summary) self.file_writer.write(summary_data, "simulation_summary.json") # Stop simulation self.simulation_core.stop() self.file_writer.close() print("Simulation completed") def _get_summary(self) -> Dict[str, Any]: """Get simulation summary.""" duration = time.time() - self.start_time if self.start_time else 0 world_state = self.simulation_core.get_world_state() return { 'simulation_config': { 'grid_width': self.config.simulation.grid_width, 'grid_height': self.config.simulation.grid_height, 'initial_cells': self.config.simulation.initial_cells, 'initial_food': self.config.simulation.initial_food, 'default_tps': self.config.simulation.default_tps }, 'runtime': { 'duration_seconds': duration, 'total_ticks': self.simulation_core.state.total_ticks, 'average_tps': self.simulation_core.state.total_ticks / duration if duration > 0 else 0, 'final_actual_tps': self.simulation_core.state.actual_tps }, 'final_state': world_state, 'data_collection': { 'collectors_used': list(self.collectors.keys()), 'output_formats': self.config.output_formats, 'output_directory': self.config.output_dir } } def _signal_handler(self, signum, frame): """Handle shutdown signals.""" print(f"\nReceived signal {signum}, shutting down gracefully...") self.running = False def get_real_time_status(self) -> Dict[str, Any]: """Get current simulation status (useful for monitoring).""" return { 'running': self.running, 'ticks': self.simulation_core.state.total_ticks, 'tps': self.simulation_core.state.actual_tps, 'duration': time.time() - self.start_time if self.start_time else 0, 'world_state': self.simulation_core.get_world_state() }