408 lines
15 KiB
Python
408 lines
15 KiB
Python
"""Headless simulation engine for running simulations without UI."""
|
|
|
|
import time
|
|
import signal
|
|
from typing import Dict, Any, Optional, List
|
|
from dataclasses import dataclass
|
|
|
|
from core.simulation_core import SimulationCore, SimulationConfig
|
|
from core.event_bus import EventBus
|
|
from output import MetricsCollector, EntityCollector, EvolutionCollector
|
|
from output.formatters.json_formatter import JSONFormatter
|
|
from output.formatters.csv_formatter import CSVFormatter
|
|
from output.writers.file_writer import FileWriter
|
|
try:
|
|
from tqdm import tqdm
|
|
TQDM_AVAILABLE = True
|
|
except ImportError:
|
|
TQDM_AVAILABLE = False
|
|
|
|
|
|
@dataclass
|
|
class HeadlessConfig:
|
|
"""Configuration for headless simulation engine."""
|
|
simulation: SimulationConfig
|
|
max_ticks: Optional[int] = None
|
|
max_duration: Optional[float] = None # seconds
|
|
output_dir: str = "simulation_output"
|
|
enable_metrics: bool = True
|
|
enable_entities: bool = True
|
|
enable_evolution: bool = True
|
|
metrics_interval: int = 100
|
|
entities_interval: int = 1000
|
|
evolution_interval: int = 1000
|
|
output_formats: List[str] = None # ['json', 'csv']
|
|
real_time: bool = False # Whether to run in real-time or as fast as possible
|
|
|
|
def __post_init__(self):
|
|
if self.output_formats is None:
|
|
self.output_formats = ['json']
|
|
|
|
|
|
class HeadlessSimulationEngine:
|
|
"""Headless simulation engine with data collection capabilities."""
|
|
|
|
def __init__(self, config: HeadlessConfig):
|
|
self.config = config
|
|
self.event_bus = EventBus()
|
|
self.simulation_core = SimulationCore(config.simulation, self.event_bus)
|
|
self.file_writer = FileWriter(config.output_dir)
|
|
self.formatters = self._create_formatters()
|
|
self.collectors = self._create_collectors()
|
|
|
|
# Runtime state
|
|
self.running = False
|
|
self.start_time = None
|
|
self.tick_data = {}
|
|
self.batch_data = {
|
|
'metrics': [],
|
|
'entities': [],
|
|
'evolution': []
|
|
}
|
|
|
|
# Progress tracking
|
|
self.files_written = 0
|
|
self.last_progress_update = 0
|
|
self.progress_update_interval = 1.0 # Update progress every second
|
|
self.progress_bar = None
|
|
|
|
# Setup signal handlers for graceful shutdown
|
|
signal.signal(signal.SIGINT, self._signal_handler)
|
|
signal.signal(signal.SIGTERM, self._signal_handler)
|
|
|
|
def _create_formatters(self) -> Dict[str, Any]:
|
|
"""Create data formatters."""
|
|
formatters = {}
|
|
if 'json' in self.config.output_formats:
|
|
formatters['json'] = JSONFormatter()
|
|
if 'csv' in self.config.output_formats:
|
|
formatters['csv'] = CSVFormatter()
|
|
return formatters
|
|
|
|
def _create_collectors(self) -> Dict[str, Any]:
|
|
"""Create data collectors."""
|
|
collectors = {}
|
|
|
|
if self.config.enable_metrics:
|
|
collectors['metrics'] = MetricsCollector(self.config.metrics_interval)
|
|
|
|
if self.config.enable_entities:
|
|
collectors['entities'] = EntityCollector(
|
|
self.config.entities_interval,
|
|
include_cells=True,
|
|
include_food=False
|
|
)
|
|
|
|
if self.config.enable_evolution:
|
|
collectors['evolution'] = EvolutionCollector(self.config.evolution_interval)
|
|
|
|
return collectors
|
|
|
|
def _init_progress_bar(self):
|
|
"""Initialize progress bar for simulation."""
|
|
if not TQDM_AVAILABLE:
|
|
return
|
|
|
|
# Determine progress total based on configuration
|
|
if self.config.max_ticks:
|
|
total = self.config.max_ticks
|
|
unit = 'ticks'
|
|
elif self.config.max_duration:
|
|
total = int(self.config.max_duration)
|
|
unit = 'sec'
|
|
else:
|
|
# No clear total - create indeterminate progress bar
|
|
total = None
|
|
unit = 'ticks'
|
|
|
|
if total:
|
|
self.progress_bar = tqdm(
|
|
total=total,
|
|
unit=unit,
|
|
desc="Simulation",
|
|
leave=True, # Keep the bar when done
|
|
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]'
|
|
)
|
|
else:
|
|
self.progress_bar = tqdm(
|
|
unit='ticks',
|
|
desc="Simulation",
|
|
leave=True,
|
|
bar_format='{l_bar}{bar}| {n_fmt} [{elapsed}, {rate_fmt}]'
|
|
)
|
|
|
|
def _update_progress_bar(self):
|
|
"""Update progress bar with current status."""
|
|
current_time = time.time()
|
|
if current_time - self.last_progress_update < self.progress_update_interval:
|
|
return
|
|
|
|
current_tick = self.simulation_core.state.total_ticks
|
|
tps = self.simulation_core.state.actual_tps
|
|
elapsed = current_time - self.start_time
|
|
|
|
if TQDM_AVAILABLE and self.progress_bar:
|
|
# Use tqdm progress bar
|
|
if self.config.max_ticks:
|
|
# Update based on tick progress
|
|
progress = min(current_tick, self.config.max_ticks)
|
|
self.progress_bar.n = progress
|
|
self.progress_bar.set_postfix({
|
|
'TPS': f'{tps:.1f}',
|
|
'Files': self.files_written
|
|
})
|
|
elif self.config.max_duration:
|
|
# Update based on elapsed time
|
|
progress = min(elapsed, self.config.max_duration)
|
|
self.progress_bar.n = int(progress)
|
|
self.progress_bar.set_postfix({
|
|
'TPS': f'{tps:.1f}',
|
|
'Files': self.files_written,
|
|
'Tick': current_tick
|
|
})
|
|
else:
|
|
# Indeterminate progress
|
|
self.progress_bar.n = current_tick
|
|
self.progress_bar.set_postfix({
|
|
'TPS': f'{tps:.1f}',
|
|
'Files': self.files_written
|
|
})
|
|
|
|
self.progress_bar.refresh()
|
|
else:
|
|
# Simple text-based progress
|
|
eta_text = ""
|
|
if self.config.max_ticks and current_tick > 0:
|
|
tick_rate = current_tick / elapsed if elapsed > 0 else 0
|
|
remaining_ticks = self.config.max_ticks - current_tick
|
|
eta_seconds = remaining_ticks / tick_rate if tick_rate > 0 else 0
|
|
eta_minutes, eta_seconds = divmod(eta_seconds, 60)
|
|
eta_text = f"ETA: {int(eta_minutes)}m{int(eta_seconds)}s"
|
|
elif self.config.max_duration:
|
|
remaining_seconds = self.config.max_duration - elapsed
|
|
eta_minutes, eta_seconds = divmod(remaining_seconds, 60)
|
|
eta_text = f"ETA: {int(eta_minutes)}m{int(eta_seconds)}s"
|
|
|
|
# Calculate progress percentage if we have a limit
|
|
progress_pct = ""
|
|
if self.config.max_ticks:
|
|
pct = (current_tick / self.config.max_ticks) * 100
|
|
progress_pct = f"{pct:.1f}%"
|
|
elif self.config.max_duration:
|
|
pct = (elapsed / self.config.max_duration) * 100
|
|
progress_pct = f"{pct:.1f}%"
|
|
|
|
progress_line = f"[{current_time - self.start_time:.1f}s] "
|
|
if progress_pct:
|
|
progress_line += f"Progress: {progress_pct} "
|
|
progress_line += f"Tick: {current_tick} TPS: {tps:.1f} Files: {self.files_written}"
|
|
if eta_text:
|
|
progress_line += f" {eta_text}"
|
|
|
|
# Overwrite the previous line (using carriage return)
|
|
print(f"\r{progress_line}", end="", flush=True)
|
|
|
|
self.last_progress_update = current_time
|
|
|
|
def _close_progress_bar(self):
|
|
"""Close the progress bar."""
|
|
if not TQDM_AVAILABLE and self.running:
|
|
# Print a newline to clear the text progress line
|
|
print()
|
|
elif TQDM_AVAILABLE and self.progress_bar:
|
|
self.progress_bar.close()
|
|
|
|
def run(self) -> Dict[str, Any]:
|
|
"""Run the headless simulation."""
|
|
# Determine if we should run at max speed
|
|
max_speed_mode = not self.config.real_time and self.config.simulation.default_tps >= 1000
|
|
|
|
print(f"Starting headless simulation...")
|
|
print(f"Output directory: {self.config.output_dir}")
|
|
print(f"Max ticks: {self.config.max_ticks or 'unlimited'}")
|
|
print(f"Max duration: {self.config.max_duration or 'unlimited'} seconds")
|
|
print(f"Real-time mode: {self.config.real_time}")
|
|
print(f"Speed mode: {'Maximum speed' if max_speed_mode else f'{self.config.simulation.default_tps} TPS'}")
|
|
print(f"Output formats: {', '.join(self.config.output_formats)}")
|
|
print(f"Collectors: {', '.join(self.collectors.keys())}")
|
|
print()
|
|
|
|
self.running = True
|
|
self.start_time = time.time()
|
|
self.simulation_core.start()
|
|
|
|
# Initialize progress bar
|
|
self._init_progress_bar()
|
|
|
|
# Enable sprint mode for maximum speed if not real-time mode
|
|
if max_speed_mode:
|
|
self.simulation_core.timing.set_sprint_mode(True)
|
|
|
|
last_batch_time = time.time()
|
|
batch_interval = 5.0
|
|
|
|
try:
|
|
while self.running:
|
|
# Check termination conditions
|
|
if self._should_terminate():
|
|
break
|
|
|
|
# Update simulation
|
|
if max_speed_mode:
|
|
# In max speed mode, update as fast as possible
|
|
self.simulation_core.update(0.0)
|
|
else:
|
|
# Normal timing-based updates
|
|
self.simulation_core.update(0.016) # ~60 FPS equivalent
|
|
|
|
# Collect data
|
|
self._collect_data()
|
|
|
|
# Write batch data periodically
|
|
if time.time() - last_batch_time >= batch_interval:
|
|
self._write_batch_data()
|
|
last_batch_time = time.time()
|
|
|
|
# Update progress bar
|
|
self._update_progress_bar()
|
|
|
|
# Real-time delay if needed
|
|
if self.config.real_time:
|
|
time.sleep(0.016) # ~60 FPS
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nSimulation interrupted by user")
|
|
except Exception as e:
|
|
print(f"Simulation error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
self._finalize()
|
|
|
|
return self._get_summary()
|
|
|
|
def _should_terminate(self) -> bool:
|
|
"""Check if simulation should terminate."""
|
|
# Check max ticks
|
|
if self.config.max_ticks and self.simulation_core.state.total_ticks >= self.config.max_ticks:
|
|
print(f"Reached max ticks: {self.config.max_ticks}")
|
|
return True
|
|
|
|
# Check max duration
|
|
if self.config.max_duration:
|
|
elapsed = time.time() - self.start_time
|
|
if elapsed >= self.config.max_duration:
|
|
print(f"Reached max duration: {self.config.max_duration} seconds")
|
|
return True
|
|
|
|
return False
|
|
|
|
def _collect_data(self):
|
|
"""Collect data from all collectors."""
|
|
for collector_name, collector in self.collectors.items():
|
|
data_list = collector.update(self.simulation_core)
|
|
for data in data_list:
|
|
self.batch_data[collector_name].append(data)
|
|
|
|
def _write_batch_data(self):
|
|
"""Write collected data to files."""
|
|
if not self.file_writer.is_ready():
|
|
return
|
|
|
|
for collector_name, data_list in self.batch_data.items():
|
|
if not data_list:
|
|
continue
|
|
|
|
for format_name, formatter in self.formatters.items():
|
|
# Group data by tick and write one file per tick
|
|
data_by_tick = {}
|
|
for data in data_list:
|
|
tick = data.get('tick_count', data.get('tick', self.simulation_core.state.total_ticks))
|
|
if tick not in data_by_tick:
|
|
data_by_tick[tick] = []
|
|
data_by_tick[tick].append(data)
|
|
|
|
# Write one file per tick for this collector
|
|
for tick, tick_data in data_by_tick.items():
|
|
filename = f"{collector_name}_tick{tick}.{formatter.get_file_extension()}"
|
|
# If multiple data items for same tick, combine them or write the latest one
|
|
combined_data = tick_data[-1] if len(tick_data) == 1 else {
|
|
'timestamp': tick_data[0].get('timestamp'),
|
|
'tick_count': tick,
|
|
'collection_type': collector_name,
|
|
'multiple_entries': len(tick_data),
|
|
'data': tick_data
|
|
}
|
|
formatted_data = formatter.format(combined_data)
|
|
self.file_writer.write(formatted_data, filename)
|
|
self.files_written += 1
|
|
|
|
# Clear written data
|
|
data_list.clear()
|
|
|
|
def _finalize(self):
|
|
"""Finalize simulation and write remaining data."""
|
|
|
|
# Close progress bar
|
|
self._close_progress_bar()
|
|
|
|
print("Finalizing simulation...")
|
|
|
|
# Write any remaining data
|
|
self._write_batch_data()
|
|
|
|
# Write final summary
|
|
summary = self._get_summary()
|
|
if 'json' in self.formatters:
|
|
summary_data = self.formatters['json'].format(summary)
|
|
self.file_writer.write(summary_data, "simulation_summary.json")
|
|
self.files_written += 1
|
|
|
|
# Stop simulation
|
|
self.simulation_core.stop()
|
|
self.file_writer.close()
|
|
|
|
print("Simulation completed")
|
|
print(f"Total files written: {self.files_written}")
|
|
|
|
def _get_summary(self) -> Dict[str, Any]:
|
|
"""Get simulation summary."""
|
|
duration = time.time() - self.start_time if self.start_time else 0
|
|
world_state = self.simulation_core.get_world_state()
|
|
|
|
return {
|
|
'simulation_config': {
|
|
'grid_width': self.config.simulation.grid_width,
|
|
'grid_height': self.config.simulation.grid_height,
|
|
'initial_cells': self.config.simulation.initial_cells,
|
|
'initial_food': self.config.simulation.initial_food,
|
|
'default_tps': self.config.simulation.default_tps
|
|
},
|
|
'runtime': {
|
|
'duration_seconds': duration,
|
|
'total_ticks': self.simulation_core.state.total_ticks,
|
|
'average_tps': self.simulation_core.state.total_ticks / duration if duration > 0 else 0,
|
|
'final_actual_tps': self.simulation_core.state.actual_tps
|
|
},
|
|
'final_state': world_state,
|
|
'data_collection': {
|
|
'collectors_used': list(self.collectors.keys()),
|
|
'output_formats': self.config.output_formats,
|
|
'output_directory': self.config.output_dir
|
|
}
|
|
}
|
|
|
|
def _signal_handler(self, signum, frame):
|
|
"""Handle shutdown signals."""
|
|
print(f"\nReceived signal {signum}, shutting down gracefully...")
|
|
self.running = False
|
|
|
|
def get_real_time_status(self) -> Dict[str, Any]:
|
|
"""Get current simulation status (useful for monitoring)."""
|
|
return {
|
|
'running': self.running,
|
|
'ticks': self.simulation_core.state.total_ticks,
|
|
'tps': self.simulation_core.state.actual_tps,
|
|
'duration': time.time() - self.start_time if self.start_time else 0,
|
|
'world_state': self.simulation_core.get_world_state()
|
|
} |