"""Timing and TPS management for simulation.""" import time from dataclasses import dataclass from typing import Optional from .event_bus import EventBus, EventType, Event @dataclass class TimingState: """Current timing state.""" tps: float is_paused: bool = False sprint_mode: bool = False speed_multiplier: float = 1.0 def __post_init__(self): if self.speed_multiplier == 0: self.speed_multiplier = 1.0 class TimingController: """Manages simulation timing, TPS, and pause states.""" def __init__(self, default_tps: float = 40.0, event_bus: Optional[EventBus] = None): self.default_tps = default_tps self.state = TimingState(tps=default_tps) self.event_bus = event_bus # Timing variables self.last_tick_time = time.perf_counter() self.last_tps_time = time.perf_counter() self.tick_counter = 0 self.total_ticks = 0 self.actual_tps = 0 # Sprint mode specific self.sprint_start_time = None def set_tps(self, tps: float): """Set target TPS.""" if tps > 0: self.state.tps = max(1.0, min(1000.0, tps)) self.state.speed_multiplier = self.state.tps / self.default_tps self._notify_state_change() def set_speed_multiplier(self, multiplier: float): """Set speed multiplier for simulation.""" if multiplier > 0: self.state.speed_multiplier = max(0.1, min(10.0, multiplier)) self.state.tps = self.default_tps * self.state.speed_multiplier self._notify_state_change() def toggle_pause(self): """Toggle pause state.""" self.state.is_paused = not self.state.is_paused if self.state.is_paused: # Reset timing when paused self.last_tick_time = time.perf_counter() self.last_tps_time = time.perf_counter() self._notify_state_change() def set_pause(self, paused: bool): """Set pause state directly.""" if self.state.is_paused != paused: self.toggle_pause() def toggle_sprint_mode(self): """Toggle sprint mode.""" self.state.sprint_mode = not self.state.sprint_mode if self.state.sprint_mode: self.sprint_start_time = time.perf_counter() else: self.sprint_start_time = None self._notify_state_change() def set_sprint_mode(self, enabled: bool): """Set sprint mode directly.""" if self.state.sprint_mode != enabled: self.toggle_sprint_mode() def should_tick(self) -> bool: """Check if simulation should tick based on timing.""" if self.state.is_paused: return False if self.state.sprint_mode: return True tick_interval = 1.0 / self.state.tps current_time = time.perf_counter() return current_time - self.last_tick_time >= tick_interval def update_timing(self): """Update timing variables after a tick.""" current_time = time.perf_counter() # Update tick counters first self.tick_counter += 1 self.total_ticks += 1 if self.state.sprint_mode and self.sprint_start_time: # In sprint mode, calculate TPS based on total sprint duration sprint_duration = current_time - self.sprint_start_time if sprint_duration > 0: self.actual_tps = self.tick_counter / sprint_duration else: self.actual_tps = 0 else: # Normal mode TPS calculation using sliding window for more responsive display elapsed_time = current_time - self.last_tps_time # Update TPS more frequently for responsive display if elapsed_time >= 0.5: # Update every 500ms instead of 1 second # Calculate actual TPS based on elapsed time self.actual_tps = self.tick_counter / elapsed_time # Reset for next measurement period self.last_tps_time = current_time self.tick_counter = 0 # Update last tick time for next tick calculation - advance by tick interval if not self.state.sprint_mode: tick_interval = 1.0 / self.state.tps self.last_tick_time += tick_interval self._notify_timing_update() def get_display_tps(self) -> int: """Get TPS rounded to nearest whole number for display.""" return round(self.actual_tps) def reset_counters(self): """Reset timing counters.""" self.tick_counter = 0 self.total_ticks = 0 self.actual_tps = 0 self.last_tick_time = time.perf_counter() self.last_tps_time = time.perf_counter() if self.state.sprint_mode: self.sprint_start_time = time.perf_counter() def get_tick_interval(self) -> float: """Get current tick interval in seconds.""" return 1.0 / self.state.tps if self.state.tps > 0 else 1.0 def get_current_speed_display(self) -> str: """Get current speed display string.""" if self.state.sprint_mode: return "Sprint" elif self.state.is_paused: return "Paused" elif self.state.speed_multiplier == 1.0: return "1x" elif self.state.speed_multiplier in [0.5, 2.0, 4.0, 8.0]: return f"{self.state.speed_multiplier}x" else: return f"{self.state.speed_multiplier:.1f}x" def _notify_state_change(self): """Notify subscribers of state change.""" if self.event_bus: event = Event( type=EventType.SIMULATION_STATE_CHANGED, data={ 'timing_state': self.state, 'tps': self.state.tps, 'is_paused': self.state.is_paused, 'sprint_mode': self.state.sprint_mode, 'speed_multiplier': self.state.speed_multiplier } ) self.event_bus.publish(event) def _notify_timing_update(self): """Notify subscribers of timing update.""" if self.event_bus: event = Event( type=EventType.TIMING_UPDATE, data={ 'actual_tps': self.actual_tps, 'total_ticks': self.total_ticks, 'tick_counter': self.tick_counter } ) self.event_bus.publish(event)