"""Headless simulation engine for running simulations without UI.""" import time import signal import sys from typing import Dict, Any, Optional, List from dataclasses import dataclass from core.simulation_core import SimulationCore, SimulationConfig from core.event_bus import EventBus from output import MetricsCollector, EntityCollector, EvolutionCollector from output.formatters.json_formatter import JSONFormatter from output.formatters.csv_formatter import CSVFormatter from output.writers.file_writer import FileWriter try: from tqdm import tqdm TQDM_AVAILABLE = True except ImportError: TQDM_AVAILABLE = False @dataclass class HeadlessConfig: """Configuration for headless simulation engine.""" simulation: SimulationConfig max_ticks: Optional[int] = None max_duration: Optional[float] = None # seconds output_dir: str = "simulation_output" enable_metrics: bool = True enable_entities: bool = True enable_evolution: bool = True metrics_interval: int = 100 entities_interval: int = 1000 evolution_interval: int = 1000 output_formats: List[str] = None # ['json', 'csv'] real_time: bool = False # Whether to run in real-time or as fast as possible def __post_init__(self): if self.output_formats is None: self.output_formats = ['json'] class HeadlessSimulationEngine: """Headless simulation engine with data collection capabilities.""" def __init__(self, config: HeadlessConfig): self.config = config self.event_bus = EventBus() self.simulation_core = SimulationCore(config.simulation, self.event_bus) self.file_writer = FileWriter(config.output_dir) self.formatters = self._create_formatters() self.collectors = self._create_collectors() # Runtime state self.running = False self.start_time = None self.tick_data = {} self.batch_data = { 'metrics': [], 'entities': [], 'evolution': [] } # Progress tracking self.files_written = 0 self.last_progress_update = 0 self.progress_update_interval = 1.0 # Update progress every second self.progress_bar = None # Setup signal handlers for graceful shutdown signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _create_formatters(self) -> Dict[str, Any]: """Create data formatters.""" formatters = {} if 'json' in self.config.output_formats: formatters['json'] = JSONFormatter() if 'csv' in self.config.output_formats: formatters['csv'] = CSVFormatter() return formatters def _create_collectors(self) -> Dict[str, Any]: """Create data collectors.""" collectors = {} if self.config.enable_metrics: collectors['metrics'] = MetricsCollector(self.config.metrics_interval) if self.config.enable_entities: collectors['entities'] = EntityCollector( self.config.entities_interval, include_cells=True, include_food=False ) if self.config.enable_evolution: collectors['evolution'] = EvolutionCollector(self.config.evolution_interval) return collectors def _init_progress_bar(self): """Initialize progress bar for simulation.""" if not TQDM_AVAILABLE: return # Determine progress total based on configuration if self.config.max_ticks: total = self.config.max_ticks unit = 'ticks' elif self.config.max_duration: total = int(self.config.max_duration) unit = 'sec' else: # No clear total - create indeterminate progress bar total = None unit = 'ticks' if total: self.progress_bar = tqdm( total=total, unit=unit, desc="Simulation", leave=True, # Keep the bar when done bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]' ) else: self.progress_bar = tqdm( unit='ticks', desc="Simulation", leave=True, bar_format='{l_bar}{bar}| {n_fmt} [{elapsed}, {rate_fmt}]' ) def _update_progress_bar(self): """Update progress bar with current status.""" current_time = time.time() if current_time - self.last_progress_update < self.progress_update_interval: return current_tick = self.simulation_core.state.total_ticks tps = self.simulation_core.state.actual_tps elapsed = current_time - self.start_time if TQDM_AVAILABLE and self.progress_bar: # Use tqdm progress bar if self.config.max_ticks: # Update based on tick progress progress = min(current_tick, self.config.max_ticks) self.progress_bar.n = progress self.progress_bar.set_postfix({ 'TPS': f'{tps:.1f}', 'Files': self.files_written }) elif self.config.max_duration: # Update based on elapsed time progress = min(elapsed, self.config.max_duration) self.progress_bar.n = int(progress) self.progress_bar.set_postfix({ 'TPS': f'{tps:.1f}', 'Files': self.files_written, 'Tick': current_tick }) else: # Indeterminate progress self.progress_bar.n = current_tick self.progress_bar.set_postfix({ 'TPS': f'{tps:.1f}', 'Files': self.files_written }) self.progress_bar.refresh() else: # Simple text-based progress eta_text = "" if self.config.max_ticks and current_tick > 0: tick_rate = current_tick / elapsed if elapsed > 0 else 0 remaining_ticks = self.config.max_ticks - current_tick eta_seconds = remaining_ticks / tick_rate if tick_rate > 0 else 0 eta_minutes, eta_seconds = divmod(eta_seconds, 60) eta_text = f"ETA: {int(eta_minutes)}m{int(eta_seconds)}s" elif self.config.max_duration: remaining_seconds = self.config.max_duration - elapsed eta_minutes, eta_seconds = divmod(remaining_seconds, 60) eta_text = f"ETA: {int(eta_minutes)}m{int(eta_seconds)}s" # Calculate progress percentage if we have a limit progress_pct = "" if self.config.max_ticks: pct = (current_tick / self.config.max_ticks) * 100 progress_pct = f"{pct:.1f}%" elif self.config.max_duration: pct = (elapsed / self.config.max_duration) * 100 progress_pct = f"{pct:.1f}%" progress_line = f"[{current_time - self.start_time:.1f}s] " if progress_pct: progress_line += f"Progress: {progress_pct} " progress_line += f"Tick: {current_tick} TPS: {tps:.1f} Files: {self.files_written}" if eta_text: progress_line += f" {eta_text}" # Overwrite the previous line (using carriage return) print(f"\r{progress_line}", end="", flush=True) self.last_progress_update = current_time def _close_progress_bar(self): """Close the progress bar.""" if not TQDM_AVAILABLE and self.running: # Print a newline to clear the text progress line print() elif TQDM_AVAILABLE and self.progress_bar: self.progress_bar.close() def run(self) -> Dict[str, Any]: """Run the headless simulation.""" # Determine if we should run at max speed max_speed_mode = not self.config.real_time and self.config.simulation.default_tps >= 1000 print(f"Starting headless simulation...") print(f"Output directory: {self.config.output_dir}") print(f"Max ticks: {self.config.max_ticks or 'unlimited'}") print(f"Max duration: {self.config.max_duration or 'unlimited'} seconds") print(f"Real-time mode: {self.config.real_time}") print(f"Speed mode: {'Maximum speed' if max_speed_mode else f'{self.config.simulation.default_tps} TPS'}") print(f"Output formats: {', '.join(self.config.output_formats)}") print(f"Collectors: {', '.join(self.collectors.keys())}") print() self.running = True self.start_time = time.time() self.simulation_core.start() # Initialize progress bar self._init_progress_bar() # Enable sprint mode for maximum speed if not real-time mode if max_speed_mode: self.simulation_core.timing.set_sprint_mode(True) print("Running at maximum speed (sprint mode enabled)") last_batch_time = time.time() batch_interval = 5.0 try: while self.running: # Check termination conditions if self._should_terminate(): break # Update simulation if max_speed_mode: # In max speed mode, update as fast as possible self.simulation_core.update(0.0) else: # Normal timing-based updates self.simulation_core.update(0.016) # ~60 FPS equivalent # Collect data self._collect_data() # Write batch data periodically if time.time() - last_batch_time >= batch_interval: self._write_batch_data() last_batch_time = time.time() # Update progress bar self._update_progress_bar() # Real-time delay if needed if self.config.real_time: time.sleep(0.016) # ~60 FPS except KeyboardInterrupt: print("\nSimulation interrupted by user") except Exception as e: print(f"Simulation error: {e}") import traceback traceback.print_exc() finally: self._finalize() return self._get_summary() def _should_terminate(self) -> bool: """Check if simulation should terminate.""" # Check max ticks if self.config.max_ticks and self.simulation_core.state.total_ticks >= self.config.max_ticks: print(f"Reached max ticks: {self.config.max_ticks}") return True # Check max duration if self.config.max_duration: elapsed = time.time() - self.start_time if elapsed >= self.config.max_duration: print(f"Reached max duration: {self.config.max_duration} seconds") return True return False def _collect_data(self): """Collect data from all collectors.""" for collector_name, collector in self.collectors.items(): data_list = collector.update(self.simulation_core) for data in data_list: self.batch_data[collector_name].append(data) def _write_batch_data(self): """Write collected data to files.""" if not self.file_writer.is_ready(): return for collector_name, data_list in self.batch_data.items(): if not data_list: continue for format_name, formatter in self.formatters.items(): # Group data by tick and write one file per tick data_by_tick = {} for data in data_list: tick = data.get('tick_count', data.get('tick', self.simulation_core.state.total_ticks)) if tick not in data_by_tick: data_by_tick[tick] = [] data_by_tick[tick].append(data) # Write one file per tick for this collector for tick, tick_data in data_by_tick.items(): filename = f"{collector_name}_tick{tick}.{formatter.get_file_extension()}" # If multiple data items for same tick, combine them or write the latest one combined_data = tick_data[-1] if len(tick_data) == 1 else { 'timestamp': tick_data[0].get('timestamp'), 'tick_count': tick, 'collection_type': collector_name, 'multiple_entries': len(tick_data), 'data': tick_data } formatted_data = formatter.format(combined_data) self.file_writer.write(formatted_data, filename) self.files_written += 1 # Clear written data data_list.clear() print(f"Wrote batch data at tick {self.simulation_core.state.total_ticks}") def _finalize(self): """Finalize simulation and write remaining data.""" # Close progress bar self._close_progress_bar() print("Finalizing simulation...") # Write any remaining data self._write_batch_data() # Write final summary summary = self._get_summary() if 'json' in self.formatters: summary_data = self.formatters['json'].format(summary) self.file_writer.write(summary_data, "simulation_summary.json") self.files_written += 1 # Stop simulation self.simulation_core.stop() self.file_writer.close() print("Simulation completed") print(f"Total files written: {self.files_written}") def _get_summary(self) -> Dict[str, Any]: """Get simulation summary.""" duration = time.time() - self.start_time if self.start_time else 0 world_state = self.simulation_core.get_world_state() return { 'simulation_config': { 'grid_width': self.config.simulation.grid_width, 'grid_height': self.config.simulation.grid_height, 'initial_cells': self.config.simulation.initial_cells, 'initial_food': self.config.simulation.initial_food, 'default_tps': self.config.simulation.default_tps }, 'runtime': { 'duration_seconds': duration, 'total_ticks': self.simulation_core.state.total_ticks, 'average_tps': self.simulation_core.state.total_ticks / duration if duration > 0 else 0, 'final_actual_tps': self.simulation_core.state.actual_tps }, 'final_state': world_state, 'data_collection': { 'collectors_used': list(self.collectors.keys()), 'output_formats': self.config.output_formats, 'output_directory': self.config.output_dir } } def _signal_handler(self, signum, frame): """Handle shutdown signals.""" print(f"\nReceived signal {signum}, shutting down gracefully...") self.running = False def get_real_time_status(self) -> Dict[str, Any]: """Get current simulation status (useful for monitoring).""" return { 'running': self.running, 'ticks': self.simulation_core.state.total_ticks, 'tps': self.simulation_core.state.actual_tps, 'duration': time.time() - self.start_time if self.start_time else 0, 'world_state': self.simulation_core.get_world_state() }