Sam 3a34759094
Some checks failed
Build Simulation and Test / Run All Tests (push) Failing after 8m17s
Add core simulation components and configuration classes
Major rewrite.
2025-11-08 19:17:40 -06:00

186 lines
6.4 KiB
Python

"""Timing and TPS management for simulation."""
import time
from dataclasses import dataclass
from typing import Optional
from .event_bus import EventBus, EventType, Event
@dataclass
class TimingState:
"""Current timing state."""
tps: float
is_paused: bool = False
sprint_mode: bool = False
speed_multiplier: float = 1.0
def __post_init__(self):
if self.speed_multiplier == 0:
self.speed_multiplier = 1.0
class TimingController:
"""Manages simulation timing, TPS, and pause states."""
def __init__(self, default_tps: float = 40.0, event_bus: Optional[EventBus] = None):
self.default_tps = default_tps
self.state = TimingState(tps=default_tps)
self.event_bus = event_bus
# Timing variables
self.last_tick_time = time.perf_counter()
self.last_tps_time = time.perf_counter()
self.tick_counter = 0
self.total_ticks = 0
self.actual_tps = 0
# Sprint mode specific
self.sprint_start_time = None
def set_tps(self, tps: float):
"""Set target TPS."""
if tps > 0:
self.state.tps = max(1.0, min(1000.0, tps))
self.state.speed_multiplier = self.state.tps / self.default_tps
self._notify_state_change()
def set_speed_multiplier(self, multiplier: float):
"""Set speed multiplier for simulation."""
if multiplier > 0:
self.state.speed_multiplier = max(0.1, min(10.0, multiplier))
self.state.tps = self.default_tps * self.state.speed_multiplier
self._notify_state_change()
def toggle_pause(self):
"""Toggle pause state."""
self.state.is_paused = not self.state.is_paused
if self.state.is_paused:
# Reset timing when paused
self.last_tick_time = time.perf_counter()
self.last_tps_time = time.perf_counter()
self._notify_state_change()
def set_pause(self, paused: bool):
"""Set pause state directly."""
if self.state.is_paused != paused:
self.toggle_pause()
def toggle_sprint_mode(self):
"""Toggle sprint mode."""
self.state.sprint_mode = not self.state.sprint_mode
if self.state.sprint_mode:
self.sprint_start_time = time.perf_counter()
else:
self.sprint_start_time = None
self._notify_state_change()
def set_sprint_mode(self, enabled: bool):
"""Set sprint mode directly."""
if self.state.sprint_mode != enabled:
self.toggle_sprint_mode()
def should_tick(self) -> bool:
"""Check if simulation should tick based on timing."""
if self.state.is_paused:
return False
if self.state.sprint_mode:
return True
tick_interval = 1.0 / self.state.tps
current_time = time.perf_counter()
return current_time - self.last_tick_time >= tick_interval
def update_timing(self):
"""Update timing variables after a tick."""
current_time = time.perf_counter()
# Update tick counters first
self.tick_counter += 1
self.total_ticks += 1
if self.state.sprint_mode and self.sprint_start_time:
# In sprint mode, calculate TPS based on total sprint duration
sprint_duration = current_time - self.sprint_start_time
if sprint_duration > 0:
self.actual_tps = self.tick_counter / sprint_duration
else:
self.actual_tps = 0
else:
# Normal mode TPS calculation using sliding window for more responsive display
elapsed_time = current_time - self.last_tps_time
# Update TPS more frequently for responsive display
if elapsed_time >= 0.5: # Update every 500ms instead of 1 second
# Calculate actual TPS based on elapsed time
self.actual_tps = self.tick_counter / elapsed_time
# Reset for next measurement period
self.last_tps_time = current_time
self.tick_counter = 0
# Update last tick time for next tick calculation - advance by tick interval
if not self.state.sprint_mode:
tick_interval = 1.0 / self.state.tps
self.last_tick_time += tick_interval
self._notify_timing_update()
def get_display_tps(self) -> int:
"""Get TPS rounded to nearest whole number for display."""
return round(self.actual_tps)
def reset_counters(self):
"""Reset timing counters."""
self.tick_counter = 0
self.total_ticks = 0
self.actual_tps = 0
self.last_tick_time = time.perf_counter()
self.last_tps_time = time.perf_counter()
if self.state.sprint_mode:
self.sprint_start_time = time.perf_counter()
def get_tick_interval(self) -> float:
"""Get current tick interval in seconds."""
return 1.0 / self.state.tps if self.state.tps > 0 else 1.0
def get_current_speed_display(self) -> str:
"""Get current speed display string."""
if self.state.sprint_mode:
return "Sprint"
elif self.state.is_paused:
return "Paused"
elif self.state.speed_multiplier == 1.0:
return "1x"
elif self.state.speed_multiplier in [0.5, 2.0, 4.0, 8.0]:
return f"{self.state.speed_multiplier}x"
else:
return f"{self.state.speed_multiplier:.1f}x"
def _notify_state_change(self):
"""Notify subscribers of state change."""
if self.event_bus:
event = Event(
type=EventType.SIMULATION_STATE_CHANGED,
data={
'timing_state': self.state,
'tps': self.state.tps,
'is_paused': self.state.is_paused,
'sprint_mode': self.state.sprint_mode,
'speed_multiplier': self.state.speed_multiplier
}
)
self.event_bus.publish(event)
def _notify_timing_update(self):
"""Notify subscribers of timing update."""
if self.event_bus:
event = Event(
type=EventType.TIMING_UPDATE,
data={
'actual_tps': self.actual_tps,
'total_ticks': self.total_ticks,
'tick_counter': self.tick_counter
}
)
self.event_bus.publish(event)