Compare commits

..

2 Commits

Author SHA1 Message Date
Sam
3a34759094 Add core simulation components and configuration classes
Some checks failed
Build Simulation and Test / Run All Tests (push) Failing after 8m17s
Major rewrite.
2025-11-08 19:17:40 -06:00
Sam
dc2f6d5fc4 Add profiling for simulation ticks and include snakeviz as a development dependency 2025-11-05 15:56:20 -06:00
28 changed files with 2875 additions and 199 deletions

340
README_ARCHITECTURE.md Normal file
View File

@ -0,0 +1,340 @@
# Dynamic Abstraction System - Complete Architecture
## Overview
The Dynamic Abstraction System is a sophisticated 2D cellular simulation platform featuring autonomous entities with evolving neural networks. The system demonstrates headless-first architecture with clean separation of concerns, supporting both interactive development and production-scale batch processing.
## Architecture Principles
1. **Headless-First Design**: Core simulation logic runs independently of any UI dependencies
2. **Separation of Concerns**: Each component has a single, well-defined responsibility
3. **Loose Coupling**: Components communicate through events and interfaces rather than direct dependencies
4. **Configuration-Driven**: Behavior controlled through structured configuration files
5. **Extensible Data Collection**: Comprehensive metrics and evolution tracking capabilities
6. **Dual-Mode Support**: Both interactive debugging and headless batch processing
## Complete Directory Structure
```
DynamicAbstractionSystem/
├── main.py # Legacy entry point (interactive)
├── entry_points/ # Application entry points
│ ├── headless_main.py # Headless simulation entry
│ ├── interactive_main.py # Interactive simulation entry
│ └── __init__.py
├── core/ # Core simulation infrastructure
│ ├── simulation_core.py # Pure simulation logic (no UI deps)
│ ├── timing.py # TPS/timing management
│ ├── event_bus.py # Decoupled communication system
│ ├── simulation_engine.py # Interactive engine (UI wrapper)
│ ├── input_handler.py # Pure input processing
│ ├── renderer.py # World-to-screen rendering
│ └── __init__.py
├── engines/ # Specialized simulation engines
│ ├── headless_engine.py # Production headless engine
│ └── __init__.py
├── world/ # Simulation world and entities
│ ├── __init__.py
│ ├── world.py # Spatial partitioning system
│ ├── objects.py # Entity implementations
│ ├── base/ # Base entity systems
│ │ ├── brain.py # Neural network implementations
│ │ ├── neural.py # Flexible neural network
│ │ └── behavioral.py # Behavioral models
│ └── simulation_interface.py # Camera and spatial interfaces
├── ui/ # User interface components
│ ├── hud.py # Heads-up display and panels
│ └── __init__.py
├── config/ # Configuration management
│ ├── constants.py # Global constants
│ ├── simulation_config.py # Configuration classes
│ ├── config_loader.py # Configuration loading utilities
│ └── __init__.py
├── output/ # Data collection and output
│ ├── __init__.py
│ ├── collectors/ # Data collection modules
│ │ ├── base_collector.py # Collector interface
│ │ ├── metrics_collector.py # Basic metrics collection
│ │ ├── entity_collector.py # Entity state tracking
│ │ └── evolution_collector.py # Evolution tracking
│ ├── formatters/ # Output formatting
│ │ ├── base_formatter.py # Formatter interface
│ │ ├── json_formatter.py # JSON output
│ │ └── csv_formatter.py # CSV output
│ └── writers/ # Output destinations
│ ├── base_writer.py # Writer interface
│ └── file_writer.py # File system output
└── tests/ # Test suite
├── __init__.py
└── [test files]
```
## Core System Architecture
### 1. Simulation Layer
#### SimulationCore (`core/simulation_core.py`)
- **Purpose**: Pure simulation logic with zero UI dependencies
- **Responsibilities**:
- World management and entity lifecycle
- Entity spawning and spatial queries
- Event-driven state management
- Timing coordination
- **Key Features**:
- Event-driven architecture via EventBus
- Configurable simulation parameters
- Complete UI isolation
- Spatial query methods (radius, range, nearest)
#### TimingController (`core/timing.py`)
- **Purpose**: Precise timing management and simulation control
- **Responsibilities**:
- TPS (ticks per second) regulation
- Pause/sprint state management
- Speed multipliers
- Performance timing
- **Key Features**:
- Configurable target TPS
- Sprint mode for accelerated execution
- Pause/resume functionality
- Real-time performance metrics
#### EventBus (`core/event_bus.py`)
- **Purpose**: Decoupled inter-component communication
- **Responsibilities**:
- Event routing and subscription management
- Type-safe event dispatch
- Event history tracking
- **Key Features**:
- Type-safe event system
- Event history for debugging
- Publisher-subscriber pattern
- Loose coupling between components
### 2. Engine Layer
#### SimulationEngine (`core/simulation_engine.py`)
- **Purpose**: Interactive simulation engine with UI integration
- **Responsibilities**:
- Main game loop coordination
- UI event processing
- Rendering pipeline management
- Component orchestration
- **Key Features**:
- Pygame integration
- Real-time input handling
- Rendering coordination
- UI state synchronization
#### HeadlessSimulationEngine (`engines/headless_engine.py`)
- **Purpose**: Production-grade headless simulation execution
- **Responsibilities**:
- Batch simulation execution
- Data collection management
- Output generation
- Performance monitoring
- **Key Features**:
- No UI dependencies
- Maximum speed execution (2000+ TPS)
- Sprint mode integration for unlimited performance
- Precise TPS control when specified
- Comprehensive data collection
- Multiple output formats
- Configurable execution parameters
- Graceful shutdown handling
### 3. World Layer
#### World (`world/world.py`)
- **Purpose**: Spatial partitioning and entity management
- **Responsibilities**:
- Spatial indexing and partitioning
- Entity lifecycle management
- Collision detection support
- Spatial queries
- **Key Features**:
- Grid-based spatial partitioning
- Double buffering for performance
- Efficient spatial queries
- Entity add/remove operations
#### Entity System (`world/objects.py`, `world/base/`)
- **Purpose**: Entity implementations and behavioral systems
- **Key Components**:
- `BaseEntity`: Abstract base class for all entities
- `DefaultCell`: Autonomous cellular entity with neural networks
- `FoodObject`: Energy source entities
- `CellBrain`: Neural network-based decision making
- `FlexibleNeuralNetwork`: Dynamic topology neural networks
### 4. Data Collection System
#### Collectors (`output/collectors/`)
- **BaseCollector**: Abstract base with tick-based collection logic
- **MetricsCollector**: Basic simulation metrics (TPS, entity counts, timing)
- **EntityCollector**: Detailed entity state snapshots
- **EvolutionCollector**: Neural network evolution and generational tracking
- **Tick-Based Intervals**: Uses simulation ticks for accurate collection timing
- **Every-Tick Option**: Configurable for maximum data resolution
#### Formatters (`output/formatters/`)
- **JSONFormatter**: Human-readable JSON output
- **CSVFormatter**: Spreadsheet-compatible CSV output
#### Writers (`output/writers/`)
- **FileWriter**: File system output with batch processing
### 5. Configuration System
#### Configuration Classes (`config/simulation_config.py`)
- **HeadlessConfig**: Headless simulation parameters
- **InteractiveConfig**: Interactive UI configuration
- **ExperimentConfig**: Automated experiment configuration
- **OutputConfig**: Data collection and output settings
#### Configuration Loader (`config/config_loader.py`)
- **Purpose**: Dynamic configuration loading and validation
- **Features**:
- JSON/YAML support
- Configuration validation
- Sample configuration generation
- Lazy loading to avoid circular dependencies
## Entry Points
### Interactive Mode
```bash
# Standard interactive simulation
python entry_points/interactive_main.py
# With custom configuration
python entry_points/interactive_main.py --config configs/interactive.json
# Legacy compatibility
python main.py
```
### Headless Mode
```bash
# Basic headless execution (maximum speed by default)
python entry_points/headless_main.py
# With custom parameters
python entry_points/headless_main.py --max-ticks 10000 --tps 60 --output-dir results
# With data collection on every tick
python entry_points/headless_main.py --max-ticks 1000 --collect-every-tick
# With configuration file
python entry_points/headless_main.py --config configs/experiment.json
```
## Input and Control System
### Keyboard Controls
- **Space**: Toggle pause/play
- **Right Shift**: Toggle sprint mode
- **Left Shift**: Hold for temporary 2x speed boost
- **S**: Step forward one tick
- **R**: Reset camera position
- **G**: Toggle grid display
- **I**: Toggle interaction radius display
- **L**: Toggle legend display
- **WASD**: Move camera
- **Mouse wheel**: Zoom in/out
- **Middle mouse drag**: Pan camera
- **Left click/drag**: Select objects
### Input Handler Architecture
- **Pure Input Processing**: No state management, only input mapping
- **Callback System**: All simulation control actions use callbacks
- **Event-Driven**: Decoupled from simulation state
## Data Collection and Output
### Comprehensive Metrics
- **Performance Metrics**: TPS, timing statistics, execution duration
- **Entity Statistics**: Population counts, energy distributions, age demographics
- **Evolution Data**: Neural network architectures, generational statistics, mutation tracking
- **World State**: Spatial distributions, resource availability
### Output Formats
- **JSON**: Structured, human-readable, web-compatible
- **CSV**: Tabular format for spreadsheet analysis
- **File-based**: Batch output with configurable intervals
## Performance Architecture
### Spatial Optimization
- **Grid-based Spatial Partitioning**: Efficient entity queries
- **Double Buffering**: Prevents frame interference
- **Spatial Indexing**: Fast proximity and range queries
### Timing Optimization
- **Configurable TPS**: Adjustable simulation speed
- **Maximum Speed Mode**: Unlimited execution speed for batch processing (2000+ TPS)
- **Sprint Mode Integration**: Automatic sprint mode for maximum performance
- **Precise Timing**: Frame-independent simulation updates
- **Accurate TPS Control**: Precise timing when TPS is specified
### Memory Management
- **Entity Pooling**: Efficient memory allocation for entities
- **Event System**: Minimal memory overhead for inter-component communication
- **Lazy Loading**: Configuration and components loaded on demand
## Testing Architecture
### Test Coverage
- **Unit Tests**: Core component isolation testing
- **Integration Tests**: Component interaction testing
- **Performance Tests**: Benchmarking and profiling
- **End-to-End Tests**: Complete simulation workflow testing
### Testing Tools
- **cProfile Integration**: Built-in performance profiling
- **pytest Framework**: Structured test execution
- **Mock Components**: Isolated component testing
## Extension Points
### Custom Collectors
- Extensible data collection system
- Plugin architecture for custom metrics
- Configurable collection intervals
### Custom Formatters
- Pluggable output format system
- Custom serialization support
- Multiple output destinations
### Custom Engines
- Specialized simulation engines
- Custom execution patterns
- Integration interfaces
## Deployment Architecture
### Production Deployment
- **Headless Execution**: Server deployment without display dependencies
- **Batch Processing**: Automated experiment execution
- **Configuration-Driven**: Environment-specific configurations
- **Data Pipeline**: Automated data collection and output
### Development Environment
- **Interactive Debugging**: Real-time visualization and control
- **Hot Reloading**: Configuration changes without restart
- **Performance Monitoring**: Real-time TPS and performance metrics
- **Extensive Logging**: Detailed execution tracking
## Architectural Benefits
1. **Maintainability**: Clear separation of concerns and modular design
2. **Testability**: Isolated components enable comprehensive unit testing
3. **Scalability**: Headless mode supports large-scale batch processing
4. **Flexibility**: Configuration-driven behavior adaptation
5. **Performance**: Optimized spatial queries and timing management
6. **Extensibility**: Plugin architecture for custom functionality
7. **Reliability**: Event-driven architecture with error handling
8. **Portability**: Headless operation enables deployment across platforms
This architecture provides a robust foundation for both interactive development and production-scale simulation experiments, with clean separation between simulation logic, user interface, and data collection systems.

19
config/__init__.py Normal file
View File

@ -0,0 +1,19 @@
"""Configuration system for simulation modes."""
from .simulation_config import (
SimulationConfig,
HeadlessConfig,
InteractiveConfig,
ExperimentConfig,
OutputConfig
)
from .config_loader import ConfigLoader
__all__ = [
'SimulationConfig',
'HeadlessConfig',
'InteractiveConfig',
'ExperimentConfig',
'OutputConfig',
'ConfigLoader'
]

194
config/config_loader.py Normal file
View File

@ -0,0 +1,194 @@
"""Configuration loading and management."""
import json
import yaml
from pathlib import Path
from typing import Dict, Any, Union
from dataclasses import asdict
from .simulation_config import HeadlessConfig, InteractiveConfig, ExperimentConfig
class ConfigLoader:
"""Loads configuration from files and creates config objects."""
@staticmethod
def load_headless_config(config_path: Union[str, Path]) -> HeadlessConfig:
"""Load headless configuration from file."""
config_path = Path(config_path)
if not config_path.exists():
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(config_path, 'r') as f:
if config_path.suffix.lower() == '.json':
data = json.load(f)
elif config_path.suffix.lower() in ['.yml', '.yaml']:
data = yaml.safe_load(f)
else:
raise ValueError(f"Unsupported config format: {config_path.suffix}")
return ConfigLoader._dict_to_headless_config(data)
@staticmethod
def load_interactive_config(config_path: Union[str, Path]) -> InteractiveConfig:
"""Load interactive configuration from file."""
config_path = Path(config_path)
if not config_path.exists():
# Return default config if file doesn't exist
return InteractiveConfig()
with open(config_path, 'r') as f:
if config_path.suffix.lower() == '.json':
data = json.load(f)
elif config_path.suffix.lower() in ['.yml', '.yaml']:
data = yaml.safe_load(f)
else:
raise ValueError(f"Unsupported config format: {config_path.suffix}")
return ConfigLoader._dict_to_interactive_config(data)
@staticmethod
def load_experiment_config(config_path: Union[str, Path]) -> ExperimentConfig:
"""Load experiment configuration from file."""
config_path = Path(config_path)
if not config_path.exists():
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(config_path, 'r') as f:
if config_path.suffix.lower() == '.json':
data = json.load(f)
elif config_path.suffix.lower() in ['.yml', '.yaml']:
data = yaml.safe_load(f)
else:
raise ValueError(f"Unsupported config format: {config_path.suffix}")
return ConfigLoader._dict_to_experiment_config(data)
@staticmethod
def save_config(config: Union[HeadlessConfig, InteractiveConfig, ExperimentConfig],
config_path: Union[str, Path]):
"""Save configuration to file."""
config_path = Path(config_path)
config_path.parent.mkdir(parents=True, exist_ok=True)
# Convert config to dictionary
if isinstance(config, HeadlessConfig):
data = asdict(config)
elif isinstance(config, InteractiveConfig):
data = asdict(config)
elif isinstance(config, ExperimentConfig):
data = asdict(config)
else:
raise ValueError(f"Unknown config type: {type(config)}")
with open(config_path, 'w') as f:
if config_path.suffix.lower() == '.json':
json.dump(data, f, indent=2)
elif config_path.suffix.lower() in ['.yml', '.yaml']:
yaml.dump(data, f, default_flow_style=False)
else:
raise ValueError(f"Unsupported config format: {config_path.suffix}")
@staticmethod
def _dict_to_headless_config(data: Dict[str, Any]) -> HeadlessConfig:
"""Convert dictionary to HeadlessConfig."""
# Extract output config if present
output_data = data.get('output', {})
output_config = OutputConfig(**output_data)
# Extract simulation config if present
sim_data = data.get('simulation', {})
simulation_config = SimulationConfig(**sim_data)
return HeadlessConfig(
max_ticks=data.get('max_ticks'),
max_duration=data.get('max_duration'),
output=output_config,
simulation=simulation_config
)
@staticmethod
def _dict_to_interactive_config(data: Dict[str, Any]) -> InteractiveConfig:
"""Convert dictionary to InteractiveConfig."""
# Extract simulation config if present
sim_data = data.get('simulation', {})
simulation_config = SimulationConfig(**sim_data)
return InteractiveConfig(
window_width=data.get('window_width', 0),
window_height=data.get('window_height', 0),
vsync=data.get('vsync', True),
resizable=data.get('resizable', True),
show_grid=data.get('show_grid', True),
show_interaction_radius=data.get('show_interaction_radius', False),
show_legend=data.get('show_legend', True),
control_bar_height=data.get('control_bar_height', 48),
inspector_width=data.get('inspector_width', 260),
properties_width=data.get('properties_width', 320),
console_height=data.get('console_height', 120),
simulation=simulation_config
)
@staticmethod
def _dict_to_experiment_config(data: Dict[str, Any]) -> ExperimentConfig:
"""Convert dictionary to ExperimentConfig."""
# Extract base config if present
base_data = data.get('base_config', {})
base_config = ConfigLoader._dict_to_headless_config(base_data)
return ExperimentConfig(
name=data.get('name', 'experiment'),
description=data.get('description', ''),
runs=data.get('runs', 1),
run_duration=data.get('run_duration'),
run_ticks=data.get('run_ticks'),
variables=data.get('variables', {}),
base_config=base_config,
aggregate_results=data.get('aggregate_results', True),
aggregate_format=data.get('aggregate_format', 'csv')
)
@staticmethod
def get_default_headless_config() -> HeadlessConfig:
"""Get default headless configuration."""
return HeadlessConfig()
@staticmethod
def get_default_interactive_config() -> InteractiveConfig:
"""Get default interactive configuration."""
return InteractiveConfig()
@staticmethod
def create_sample_configs(output_dir: str = "configs"):
"""Create sample configuration files."""
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
# Sample headless config
headless_config = ConfigLoader.get_default_headless_config()
ConfigLoader.save_config(headless_config, output_path / "headless_default.json")
# Sample interactive config
interactive_config = ConfigLoader.get_default_interactive_config()
ConfigLoader.save_config(interactive_config, output_path / "interactive_default.json")
# Sample experiment config
experiment_config = ExperimentConfig(
name="tps_sweep",
description="Test different TPS values",
runs=5,
run_duration=60.0, # 1 minute per run
variables={
"simulation.default_tps": [10, 20, 40, 80, 160]
}
)
ConfigLoader.save_config(experiment_config, output_path / "experiment_tps_sweep.json")
print(f"Sample configs created in {output_path}")
# Import OutputConfig for the loader
from .simulation_config import OutputConfig, SimulationConfig

109
config/simulation_config.py Normal file
View File

@ -0,0 +1,109 @@
"""Simulation configuration classes for different modes."""
from dataclasses import dataclass, field
from typing import List, Optional
from config.constants import *
@dataclass
class SimulationConfig:
"""Configuration for simulation setup."""
grid_width: int = GRID_WIDTH
grid_height: int = GRID_HEIGHT
cell_size: int = CELL_SIZE
initial_cells: int = 50
initial_food: int = FOOD_OBJECTS_COUNT
food_spawning: bool = FOOD_SPAWNING
random_seed: int = RANDOM_SEED
default_tps: float = DEFAULT_TPS
@dataclass
class OutputConfig:
"""Configuration for data output."""
enabled: bool = True
directory: str = "simulation_output"
formats: List[str] = field(default_factory=lambda: ['json'])
collect_metrics: bool = True
collect_entities: bool = True
collect_evolution: bool = True
metrics_interval: int = 100
entities_interval: int = 1000
evolution_interval: int = 1000
real_time: bool = False
@dataclass
class HeadlessConfig:
"""Configuration for headless simulation mode."""
# Simulation settings
max_ticks: Optional[int] = None
max_duration: Optional[float] = None # seconds
# Output settings
output: OutputConfig = field(default_factory=OutputConfig)
# Simulation core config
simulation: SimulationConfig = field(default_factory=lambda: SimulationConfig(
grid_width=GRID_WIDTH,
grid_height=GRID_HEIGHT,
cell_size=CELL_SIZE,
initial_cells=50,
initial_food=FOOD_OBJECTS_COUNT,
food_spawning=FOOD_SPAWNING,
random_seed=RANDOM_SEED,
default_tps=DEFAULT_TPS
))
@dataclass
class InteractiveConfig:
"""Configuration for interactive simulation mode with UI."""
# Window settings
window_width: int = 0 # 0 = auto-detect
window_height: int = 0 # 0 = auto-detect
vsync: bool = True
resizable: bool = True
# UI settings
show_grid: bool = True
show_interaction_radius: bool = False
show_legend: bool = True
control_bar_height: int = 48
inspector_width: int = 260
properties_width: int = 320
console_height: int = 120
# Simulation core config
simulation: SimulationConfig = field(default_factory=lambda: SimulationConfig(
grid_width=GRID_WIDTH,
grid_height=GRID_HEIGHT,
cell_size=CELL_SIZE,
initial_cells=50,
initial_food=FOOD_OBJECTS_COUNT,
food_spawning=FOOD_SPAWNING,
random_seed=RANDOM_SEED,
default_tps=DEFAULT_TPS
))
@dataclass
class ExperimentConfig:
"""Configuration for automated experiments."""
name: str = "experiment"
description: str = ""
# Multiple runs
runs: int = 1
run_duration: Optional[float] = None # seconds per run
run_ticks: Optional[int] = None # ticks per run
# Variables to test (for parameter sweeps)
variables: dict = field(default_factory=dict)
# Base configuration
base_config: HeadlessConfig = field(default_factory=HeadlessConfig)
# Output aggregation
aggregate_results: bool = True
aggregate_format: str = "csv"

21
core/__init__.py Normal file
View File

@ -0,0 +1,21 @@
"""Core simulation infrastructure and engine components."""
from .simulation_core import SimulationCore, SimulationState
from .timing import TimingController, TimingState
from .event_bus import EventBus, EventType, Event
from .simulation_engine import SimulationEngine
from .input_handler import InputHandler
from .renderer import Renderer
__all__ = [
'SimulationCore',
'SimulationState',
'TimingController',
'TimingState',
'EventBus',
'EventType',
'Event',
'SimulationEngine',
'InputHandler',
'Renderer'
]

77
core/event_bus.py Normal file
View File

@ -0,0 +1,77 @@
"""Simple event bus for decoupled component communication."""
from typing import Dict, List, Callable, Any
from dataclasses import dataclass
from enum import Enum
class EventType(Enum):
"""Types of simulation events."""
SIMULATION_STATE_CHANGED = "simulation_state_changed"
WORLD_TICK_COMPLETED = "world_tick_completed"
ENTITY_ADDED = "entity_added"
ENTITY_REMOVED = "entity_removed"
SELECTION_CHANGED = "selection_changed"
TIMING_UPDATE = "timing_update"
@dataclass
class Event:
"""Event data structure."""
type: EventType
data: Dict[str, Any]
timestamp: float = None
def __post_init__(self):
import time
if self.timestamp is None:
self.timestamp = time.time()
class EventBus:
"""Simple event bus for decoupled communication."""
def __init__(self):
self._subscribers: Dict[EventType, List[Callable]] = {}
self._event_history: List[Event] = []
self._max_history = 1000
def subscribe(self, event_type: EventType, callback: Callable[[Event], None]):
"""Subscribe to an event type."""
if event_type not in self._subscribers:
self._subscribers[event_type] = []
self._subscribers[event_type].append(callback)
def unsubscribe(self, event_type: EventType, callback: Callable[[Event], None]):
"""Unsubscribe from an event type."""
if event_type in self._subscribers:
try:
self._subscribers[event_type].remove(callback)
except ValueError:
pass
def publish(self, event: Event):
"""Publish an event to all subscribers."""
# Store in history
self._event_history.append(event)
if len(self._event_history) > self._max_history:
self._event_history.pop(0)
# Notify subscribers
if event.type in self._subscribers:
for callback in self._subscribers[event.type]:
try:
callback(event)
except Exception as e:
print(f"Error in event callback: {e}")
def get_recent_events(self, event_type: EventType = None, count: int = 10) -> List[Event]:
"""Get recent events, optionally filtered by type."""
events = self._event_history
if event_type:
events = [e for e in events if e.type == event_type]
return events[-count:]
def clear_history(self):
"""Clear event history."""
self._event_history.clear()

View File

@ -1,35 +1,42 @@
# core/input_handler.py
"""Handles all input events and camera controls."""
"""Handles input events and camera controls - no state management."""
import pygame
from config.constants import *
class InputHandler:
"""Pure input handler - processes input without managing simulation state."""
def __init__(self, camera, world, sim_view_rect):
self.camera = camera
self.world = world
# Selection state
# Selection state (input-specific, not simulation state)
self.selecting = False
self.select_start = None
self.select_end = None
self.selected_objects = []
# UI state flags
# UI display flags (input-controlled visual settings)
self.show_grid = True
self.show_interaction_radius = False
self.show_legend = False
self.is_paused = False
# Speed control
# Simulation state references (synchronized from external source)
self.tps = DEFAULT_TPS
self.default_tps = DEFAULT_TPS
self.is_paused = False
self.sprint_mode = False
self.is_stepping = False
self.speed_multiplier = 1.0
# sim-view rect for mouse position calculations
self.sim_view_rect = sim_view_rect
# Action callbacks for simulation control
self.action_callbacks = {}
def update_sim_view_rect(self, sim_view_rect):
"""Update the sim_view rectangle."""
self.sim_view_rect = sim_view_rect
@ -79,20 +86,24 @@ class InputHandler:
elif event.key == pygame.K_l:
self.show_legend = not self.show_legend
elif event.key == pygame.K_SPACE:
self.is_paused = not self.is_paused
self.toggle_pause()
elif event.key == pygame.K_LSHIFT:
self.tps = self.default_tps * TURBO_MULTIPLIER
# Left Shift for temporary speed boost (turbo mode)
self.set_speed_multiplier(2.0)
elif event.key == pygame.K_r:
self.camera.reset_position()
elif event.key == pygame.K_RSHIFT:
self.sprint_mode = not self.sprint_mode # Enter sprint mode
self.toggle_sprint_mode() # Right Shift toggles sprint mode
elif event.key == pygame.K_s:
self.step_forward() # Step forward
return running
def _handle_keyup(self, event):
"""Handle keyup events."""
if event.key == pygame.K_LSHIFT:
self.tps = self.default_tps
# Reset speed multiplier when Left Shift is released
self.set_speed_multiplier(1.0)
# if event.key == pygame.K_RSHIFT:
# self.sprint_mode = False # Exit sprint mode
@ -181,4 +192,47 @@ class InputHandler:
width = abs(self.select_end[0] - self.select_start[0])
height = abs(self.select_end[1] - self.select_start[1])
return (left, top, width, height)
return None
return None
def set_action_callback(self, action_name: str, callback):
"""Set callback for simulation control actions."""
self.action_callbacks[action_name] = callback
def toggle_pause(self):
"""Toggle pause state via callback."""
if 'toggle_pause' in self.action_callbacks:
self.action_callbacks['toggle_pause']()
def step_forward(self):
"""Execute single simulation step via callback."""
self.is_stepping = True
if 'step_forward' in self.action_callbacks:
self.action_callbacks['step_forward']()
def set_speed_multiplier(self, multiplier):
"""Set speed multiplier for simulation via callback."""
if 'set_speed' in self.action_callbacks:
self.action_callbacks['set_speed'](multiplier)
def set_custom_tps(self, tps):
"""Set custom TPS value via callback."""
if 'set_custom_tps' in self.action_callbacks:
self.action_callbacks['set_custom_tps'](tps)
def toggle_sprint_mode(self):
"""Toggle sprint mode via callback."""
if 'toggle_sprint' in self.action_callbacks:
self.action_callbacks['toggle_sprint']()
def get_current_speed_display(self):
"""Get current speed display string."""
if self.sprint_mode:
return "Sprint"
elif self.is_paused:
return "Paused"
elif self.speed_multiplier == 1.0:
return "1x"
elif self.speed_multiplier in [0.5, 2.0, 4.0, 8.0]:
return f"{self.speed_multiplier}x"
else:
return f"{self.speed_multiplier:.1f}x"

310
core/simulation_core.py Normal file
View File

@ -0,0 +1,310 @@
"""Pure simulation core with no UI dependencies."""
import time
import random
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
from world.world import World, Position, Rotation
from world.simulation_interface import Camera
from .event_bus import EventBus, EventType, Event
from .timing import TimingController
from config.constants import *
from config.simulation_config import SimulationConfig
@dataclass
class SimulationState:
"""Current simulation state."""
total_ticks: int = 0
actual_tps: float = 0.0
is_running: bool = False
world_buffer: int = 0
class SimulationCore:
"""Pure simulation logic with no UI dependencies."""
def __init__(self, config: SimulationConfig, event_bus: Optional[EventBus] = None):
self.config = config
self.event_bus = event_bus or EventBus()
# Initialize timing controller
self.timing = TimingController(config.default_tps, self.event_bus)
# Initialize world
self.world = self._setup_world()
# Simulation state
self.state = SimulationState()
self.is_running = False
# Camera (for spatial calculations, not rendering)
self.camera = Camera(
config.grid_width * config.cell_size,
config.grid_height * config.cell_size,
RENDER_BUFFER
)
def _setup_world(self) -> World:
"""Setup the simulation world."""
# Import locally to avoid circular imports
from world.objects import DefaultCell, FoodObject
world = World(
self.config.cell_size,
(
self.config.cell_size * self.config.grid_width,
self.config.cell_size * self.config.grid_height
)
)
# Set random seed for reproducibility
random.seed(self.config.random_seed)
# Calculate world bounds
half_width = self.config.grid_width * self.config.cell_size // 2
half_height = self.config.grid_height * self.config.cell_size // 2
# Add initial food
if self.config.food_spawning:
for _ in range(self.config.initial_food):
x = random.randint(-half_width // 2, half_width // 2)
y = random.randint(-half_height // 2, half_height // 2)
food = FoodObject(Position(x=x, y=y))
world.add_object(food)
self._notify_entity_added(food)
# Add initial cells
for _ in range(self.config.initial_cells):
cell = DefaultCell(
Position(
x=random.randint(-half_width // 2, half_width // 2),
y=random.randint(-half_height // 2, half_height // 2)
),
Rotation(angle=0)
)
# Mutate the initial behavioral model for variety
cell.behavioral_model = cell.behavioral_model.mutate(3)
world.add_object(cell)
self._notify_entity_added(cell)
return world
def start(self):
"""Start the simulation."""
self.is_running = True
self.state.is_running = True
self.timing.reset_counters()
def stop(self):
"""Stop the simulation."""
self.is_running = False
self.state.is_running = False
def pause(self):
"""Pause the simulation."""
self.timing.set_pause(True)
def resume(self):
"""Resume the simulation."""
self.timing.set_pause(False)
def step(self):
"""Execute a single simulation step."""
was_paused = self.timing.state.is_paused
self.timing.set_pause(False)
self._tick()
self.timing.set_pause(was_paused)
def update(self, delta_time: float):
"""Update simulation timing and tick if needed."""
if not self.is_running:
return
# Handle sprint mode
if self.timing.state.sprint_mode:
self._handle_sprint_mode()
return
# Handle single step
if hasattr(self, '_stepping') and self._stepping:
self._tick()
self._stepping = False
return
# Normal timing-based ticks
if self.timing.should_tick():
self._tick()
# Update timing
current_time = time.perf_counter()
if current_time - self.timing.last_tps_time >= 1.0:
self.state.actual_tps = self.timing.actual_tps
self.state.total_ticks = self.timing.total_ticks
def _tick(self):
"""Execute a single simulation tick."""
# Update world
self.world.tick_all()
# Update timing
self.timing.update_timing()
# Update state
self.state.total_ticks = self.timing.total_ticks
self.state.actual_tps = self.timing.actual_tps
self.state.world_buffer = self.world.current_buffer
# Notify subscribers
self._notify_world_tick_completed()
def _handle_sprint_mode(self):
"""Handle sprint mode execution."""
if not self.timing.state.sprint_mode:
return
start_time = time.perf_counter()
# Execute multiple ticks in sprint mode
while time.perf_counter() - start_time < 0.05: # 50ms of sprint
self._tick()
def set_tps(self, tps: float):
"""Set target TPS."""
self.timing.set_tps(tps)
def set_speed_multiplier(self, multiplier: float):
"""Set speed multiplier."""
self.timing.set_speed_multiplier(multiplier)
def toggle_pause(self):
"""Toggle pause state."""
self.timing.toggle_pause()
def toggle_sprint_mode(self):
"""Toggle sprint mode."""
self.timing.toggle_sprint_mode()
def get_entities_in_radius(self, position: Position, radius: float) -> List:
"""Get entities within radius of position."""
return self.world.query_objects_within_radius(position.x, position.y, radius)
def get_entities_in_range(self, min_pos: Position, max_pos: Position) -> List:
"""Get entities within rectangular range."""
return self.world.query_objects_in_range(min_pos.x, min_pos.y, max_pos.x, max_pos.y)
def get_closest_entity(self, position: Position, max_distance: float = None) -> Optional:
"""Get closest entity to position."""
if max_distance is not None:
# Filter by distance after getting closest
closest = self.world.query_closest_object(position.x, position.y)
if closest:
dx = closest.position.x - position.x
dy = closest.position.y - position.y
distance = (dx * dx + dy * dy) ** 0.5
if distance <= max_distance:
return closest
return None
else:
return self.world.query_closest_object(position.x, position.y)
def count_entities_by_type(self, entity_type) -> int:
"""Count entities of specific type."""
count = 0
for entity in self.world.get_objects():
if isinstance(entity, entity_type):
count += 1
return count
def get_world_state(self) -> Dict[str, Any]:
"""Get current world state for data collection."""
# Import locally to avoid circular imports
from world.objects import DefaultCell, FoodObject
return {
'tick_count': self.state.total_ticks,
'actual_tps': self.state.actual_tps,
'world_buffer': self.state.world_buffer,
'is_paused': self.timing.state.is_paused,
'sprint_mode': self.timing.state.sprint_mode,
'target_tps': self.timing.state.tps,
'speed_multiplier': self.timing.state.speed_multiplier,
'entity_counts': {
'total': len(list(self.world.get_objects())),
'cells': self.count_entities_by_type(DefaultCell),
'food': self.count_entities_by_type(FoodObject)
}
}
def get_entity_states(self) -> List[Dict[str, Any]]:
"""Get states of all entities for data collection."""
# Import locally to avoid circular imports
from world.objects import DefaultCell, FoodObject
entities = []
for entity in self.world.get_objects():
if isinstance(entity, DefaultCell):
# Get neural network info safely
nn = entity.behavioral_model.neural_network if hasattr(entity.behavioral_model, 'neural_network') else None
layer_sizes = [len(nn.layers)] if nn and hasattr(nn, 'layers') else [4, 6, 2] # Default estimate
entities.append({
'id': id(entity),
'type': 'cell',
'position': {'x': entity.position.x, 'y': entity.position.y},
'rotation': entity.rotation.angle,
'energy': entity.energy,
'age': entity.tick_count, # tick_count represents age
'generation': getattr(entity, 'generation', 0), # Default to 0 if not present
'neural_network': {
'layer_sizes': layer_sizes,
'has_neural_network': nn is not None
}
})
elif isinstance(entity, FoodObject):
entities.append({
'id': id(entity),
'type': 'food',
'position': {'x': entity.position.x, 'y': entity.position.y},
'decay': entity.decay,
'max_decay': entity.max_decay
})
return entities
def _notify_world_tick_completed(self):
"""Notify subscribers that world tick completed."""
if self.event_bus:
event = Event(
type=EventType.WORLD_TICK_COMPLETED,
data={
'tick_count': self.state.total_ticks,
'world_state': self.get_world_state()
}
)
self.event_bus.publish(event)
def _notify_entity_added(self, entity):
"""Notify subscribers of entity addition."""
if self.event_bus:
event = Event(
type=EventType.ENTITY_ADDED,
data={
'entity_id': id(entity),
'entity_type': type(entity).__name__,
'position': {'x': entity.position.x, 'y': entity.position.y}
}
)
self.event_bus.publish(event)
def _notify_entity_removed(self, entity):
"""Notify subscribers of entity removal."""
if self.event_bus:
event = Event(
type=EventType.ENTITY_REMOVED,
data={
'entity_id': id(entity),
'entity_type': type(entity).__name__
}
)
self.event_bus.publish(event)

View File

@ -1,28 +1,40 @@
import pygame
import time
import random
import sys
from pygame_gui import UIManager
from world.base.brain import CellBrain, FlexibleNeuralNetwork
from world.world import World, Position, Rotation
from world.objects import FoodObject, DefaultCell
from world.simulation_interface import Camera
from config.constants import *
from core.input_handler import InputHandler
from core.renderer import Renderer
from core.simulation_core import SimulationCore, SimulationConfig
from core.event_bus import EventBus
from ui.hud import HUD
import cProfile
import pstats
class SimulationEngine:
"""Interactive simulation engine with UI (wrapper around SimulationCore)."""
def __init__(self):
pygame.init()
self.event_bus = EventBus()
self._init_window()
self._init_ui()
self._init_simulation()
self._init_ui()
self.running = True
def _profile_single_tick(self):
"""Profile a single tick for performance analysis."""
profiler = cProfile.Profile()
profiler.enable()
self.simulation_core.world.tick_all()
profiler.disable()
profiler.dump_stats('profile_tick.prof') # Save to file
def _init_window(self):
info = pygame.display.Info()
self.window_width = int(info.current_w // 1.5)
@ -41,16 +53,48 @@ class SimulationEngine:
self._update_simulation_view()
def _init_simulation(self):
self.last_tick_time = time.perf_counter()
self.last_tps_time = time.perf_counter()
self.tick_counter = 0
self.actual_tps = 0
self.total_ticks = 0
# Initialize default sim view rect (will be updated by _init_ui)
self.sim_view_width = self.window_width - 400 # Rough estimate for inspector width
self.sim_view_height = self.window_height - 200 # Rough estimate for control bar height
self.sim_view = pygame.Surface((self.sim_view_width, self.sim_view_height))
self.sim_view_rect = self.sim_view.get_rect(topleft=(200, 48)) # Rough estimate
self.world = self._setup_world()
self.input_handler = InputHandler(self.camera, self.world, self.sim_view_rect)
# Create simulation core
sim_config = SimulationConfig(
grid_width=GRID_WIDTH,
grid_height=GRID_HEIGHT,
cell_size=CELL_SIZE,
initial_cells=50,
initial_food=FOOD_OBJECTS_COUNT,
food_spawning=FOOD_SPAWNING,
random_seed=RANDOM_SEED,
default_tps=DEFAULT_TPS
)
self.simulation_core = SimulationCore(sim_config, self.event_bus)
# Setup input handler with simulation core world
self.input_handler = InputHandler(
self.simulation_core.camera,
self.simulation_core.world,
self.sim_view_rect
)
self.input_handler.tps = self.simulation_core.timing.state.tps
self.input_handler.default_tps = DEFAULT_TPS
# Set up action callbacks for input handler
self.input_handler.set_action_callback('toggle_pause', self.simulation_core.toggle_pause)
self.input_handler.set_action_callback('step_forward', self.simulation_core.step)
self.input_handler.set_action_callback('set_speed', self.simulation_core.set_speed_multiplier)
self.input_handler.set_action_callback('set_custom_tps', self.simulation_core.set_tps)
self.input_handler.set_action_callback('toggle_sprint', self.simulation_core.toggle_sprint_mode)
# Setup renderer
self.renderer = Renderer(self.sim_view)
# Profile a single tick for performance analysis
self._profile_single_tick()
def _update_simulation_view(self):
viewport_rect = self.hud.get_viewport_rect()
self.sim_view_width = viewport_rect.width
@ -61,170 +105,189 @@ class SimulationEngine:
self.ui_manager.set_window_resolution((self.window_width, self.window_height))
self.renderer = Renderer(self.sim_view)
if hasattr(self, 'camera'):
self.camera.screen_width = self.sim_view_width
self.camera.screen_height = self.sim_view_height
# Update simulation core camera dimensions
self.simulation_core.camera.screen_width = self.sim_view_width
self.simulation_core.camera.screen_height = self.sim_view_height
if hasattr(self, 'input_handler'):
self.input_handler.update_sim_view_rect(self.sim_view_rect)
if not hasattr(self, 'camera'):
self.camera = Camera(self.sim_view_width, self.sim_view_height, RENDER_BUFFER)
@staticmethod
def _setup_world():
world = World(CELL_SIZE, (CELL_SIZE * GRID_WIDTH, CELL_SIZE * GRID_HEIGHT))
random.seed(RANDOM_SEED)
half_width = GRID_WIDTH * CELL_SIZE // 2
half_height = GRID_HEIGHT * CELL_SIZE // 2
if FOOD_SPAWNING:
for _ in range(FOOD_OBJECTS_COUNT):
x = random.randint(-half_width // 2, half_width // 2)
y = random.randint(-half_height // 2, half_height // 2)
world.add_object(FoodObject(Position(x=x, y=y)))
for _ in range(350):
new_cell = DefaultCell(
Position(x=random.randint(-half_width // 2, half_width // 2), y=random.randint(-half_height // 2, half_height // 2)),
Rotation(angle=0)
)
new_cell.behavioral_model = new_cell.behavioral_model.mutate(3)
world.add_object(new_cell)
return world
# Update input handler simulation view rect
self.input_handler.update_sim_view_rect(self.sim_view_rect)
def _count_cells(self):
count = 0
for entity in self.world.get_objects():
if isinstance(entity, DefaultCell):
count += 1
return count
"""Count cells in the simulation."""
# Import locally to avoid circular import
from world.objects import DefaultCell
return self.simulation_core.count_entities_by_type(DefaultCell)
def run(self):
print(self.world.current_buffer)
"""Run the interactive simulation engine."""
print(f"World buffer: {self.simulation_core.world.current_buffer}")
self.simulation_core.start()
while self.running:
self._handle_frame()
self.simulation_core.stop()
pygame.quit()
sys.exit()
def _handle_frame(self):
"""Handle a single frame in the interactive simulation."""
deltatime = self.clock.get_time() / 1000.0
tick_interval = 1.0 / self.input_handler.tps
# Handle events
events = pygame.event.get()
self.running = self.input_handler.handle_events(events, self.hud.manager)
self._handle_window_events(events)
# Process HUD events and window events
for event in events:
hud_action = self.hud.process_event(event)
self._handle_hud_actions(hud_action)
if event.type == pygame.VIDEORESIZE:
self._handle_window_resize(event)
# Sync input handler state with simulation core timing
self._sync_input_and_timing()
# Handle sprint mode
if self.input_handler.sprint_mode:
self._handle_sprint_mode()
return
# Only process one tick per frame if enough time has passed
if not self.input_handler.is_paused:
current_time = time.perf_counter()
if current_time - self.last_tick_time >= tick_interval:
self.last_tick_time += tick_interval
self.tick_counter += 1
self.total_ticks += 1
self.input_handler.update_selected_objects()
self.world.tick_all()
self.hud.manager.update(deltatime)
if current_time - self.last_tps_time >= 1.0:
self.actual_tps = self.tick_counter
self.tick_counter = 0
self.last_tps_time += 1.0
else:
self.last_tick_time = time.perf_counter()
self.last_tps_time = time.perf_counter()
# Update UI manager every frame
self.hud.manager.update(deltatime)
self.hud.manager.draw_ui(self.screen)
# Handle step-forward mode
if self.input_handler.is_stepping:
self.simulation_core.step()
self.input_handler.is_stepping = False
else:
# Update simulation using core
self.simulation_core.update(deltatime)
# Update selected objects in input handler
self.input_handler.update_selected_objects()
# Render frame
self._update(deltatime)
self._render()
def _handle_window_events(self, events):
for event in events:
self.hud.process_event(event)
if event.type == pygame.VIDEORESIZE:
self.window_width, self.window_height = event.w, event.h
self.screen = pygame.display.set_mode(
(self.window_width, self.window_height),
pygame.RESIZABLE
)
self._update_simulation_view()
self.hud.update_layout(self.window_width, self.window_height)
def _sync_input_and_timing(self):
"""Synchronize input handler state with simulation core timing."""
timing_state = self.simulation_core.timing.state
self.hud.update_layout(self.window_width, self.window_height)
# Sync TPS
self.input_handler.tps = timing_state.tps
# Sync pause state
self.input_handler.is_paused = timing_state.is_paused
# Sync sprint mode
self.input_handler.sprint_mode = timing_state.sprint_mode
# Sync speed multiplier
self.input_handler.speed_multiplier = timing_state.speed_multiplier
def _handle_window_resize(self, event):
"""Handle window resize event."""
self.window_width, self.window_height = event.w, event.h
self.screen = pygame.display.set_mode(
(self.window_width, self.window_height),
pygame.RESIZABLE
)
self._update_simulation_view()
self.hud.update_layout(self.window_width, self.window_height)
def _handle_hud_actions(self, action):
"""Handle actions from HUD simulation controls by forwarding to simulation core."""
if action == 'toggle_pause':
self.simulation_core.toggle_pause()
elif action == 'step_forward':
self.simulation_core.step()
elif action == 'toggle_sprint':
self.simulation_core.toggle_sprint_mode()
elif isinstance(action, tuple) and action[0] == 'set_speed':
self.simulation_core.set_speed_multiplier(action[1])
elif isinstance(action, tuple) and action[0] == 'set_custom_tps':
self.simulation_core.set_tps(action[1])
elif isinstance(action, tuple) and action[0] == 'reset_tps_display':
# Reset TPS display to current value
if self.hud.custom_tps_entry:
self.hud.custom_tps_entry.set_text(str(int(self.simulation_core.timing.state.tps)))
def _handle_sprint_mode(self):
"""Handle sprint mode by running multiple simulation ticks quickly."""
current_time = time.perf_counter()
while True:
while time.perf_counter() - current_time < 0.05: # 50ms of sprint
self.simulation_core.update(0.016) # Update simulation
self.input_handler.update_selected_objects()
self.world.tick_all()
self.tick_counter += 1
self.total_ticks += 1
pygame.event.pump() # Prevent event queue overflow
if time.perf_counter() - current_time > 0.05:
break
if time.perf_counter() - self.last_tps_time >= 1.0:
self.actual_tps = self.tick_counter
self.tick_counter = 0
self.last_tps_time = time.perf_counter()
# Render sprint debug info
self.screen.fill(BLACK)
self.renderer.clear_screen()
cell_count = self._count_cells()
self.hud.render_sprint_debug(self.screen, self.actual_tps, self.total_ticks, cell_count)
self.hud.render_sprint_debug(
self.screen,
self.simulation_core.state.actual_tps,
self.simulation_core.state.total_ticks,
cell_count
)
pygame.display.flip()
self.clock.tick(MAX_FPS)
self.last_tick_time = time.perf_counter()
def _handle_simulation_ticks(self, tick_interval, deltatime):
current_time = time.perf_counter()
while current_time - self.last_tick_time >= tick_interval:
self.last_tick_time += tick_interval
self.tick_counter += 1
self.total_ticks += 1
self.input_handler.update_selected_objects()
self.world.tick_all()
self.hud.manager.update(deltatime)
if current_time - self.last_tps_time >= 1.0:
self.actual_tps = self.tick_counter
self.tick_counter = 0
self.last_tps_time += 1.0
def _update(self, deltatime):
"""Update camera based on input."""
keys = pygame.key.get_pressed()
self.input_handler.update_camera(keys, deltatime)
def _render(self):
"""Render the simulation frame."""
self.screen.fill(BLACK)
self.renderer.clear_screen()
if not self.hud.dragging_splitter:
self.renderer.draw_grid(self.camera, self.input_handler.show_grid)
self.renderer.render_world(self.world, self.camera)
# Render world
self.renderer.draw_grid(self.simulation_core.camera, self.input_handler.show_grid)
self.renderer.render_world(self.simulation_core.world, self.simulation_core.camera)
self.renderer.render_interaction_radius(
self.world, self.camera, self.input_handler.selected_objects, self.input_handler.show_interaction_radius
self.simulation_core.world,
self.simulation_core.camera,
self.input_handler.selected_objects,
self.input_handler.show_interaction_radius
)
self.renderer.render_selection_rectangle(
self.input_handler.get_selection_rect(),
self.sim_view_rect
)
self.renderer.render_selected_objects_outline(
self.input_handler.selected_objects,
self.simulation_core.camera
)
self.renderer.render_selection_rectangle(self.input_handler.get_selection_rect(), self.sim_view_rect)
self.renderer.render_selected_objects_outline(self.input_handler.selected_objects, self.camera)
self.screen.blit(self.sim_view, (self.sim_view_rect.left, self.sim_view_rect.top))
# Update HUD displays with simulation core state
self.hud.update_simulation_controls(self.simulation_core)
# Draw UI elements
self.hud.manager.draw_ui(self.screen)
self.hud.draw_splitters(self.screen)
# self.hud.render_mouse_position(self.screen, self.camera, self.sim_view_rect)
# Render HUD overlays
self.hud.render_fps(self.screen, self.clock)
self.hud.render_tps(self.screen, self.actual_tps)
# self.hud.render_tick_count(self.screen, self.total_ticks)
# self.hud.render_selected_objects_info(self.screen, self.input_handler.selected_objects)
self.hud.render_tps(self.screen, self.simulation_core.state.actual_tps)
self.hud.render_selected_objects_info(self.screen, self.input_handler.selected_objects)
self.hud.render_legend(self.screen, self.input_handler.show_legend)
self.hud.render_pause_indicator(self.screen, self.input_handler.is_paused)
self.hud.render_pause_indicator(self.screen, self.simulation_core.timing.state.is_paused)
# Render neural network visualization for selected object
if self.input_handler.selected_objects:
self.hud.render_neural_network_visualization(self.screen, self.input_handler.selected_objects[0])
self.hud.render_neural_network_visualization(
self.screen,
self.input_handler.selected_objects[0]
)
pygame.display.flip()
self.clock.tick(MAX_FPS)

186
core/timing.py Normal file
View File

@ -0,0 +1,186 @@
"""Timing and TPS management for simulation."""
import time
from dataclasses import dataclass
from typing import Optional
from .event_bus import EventBus, EventType, Event
@dataclass
class TimingState:
"""Current timing state."""
tps: float
is_paused: bool = False
sprint_mode: bool = False
speed_multiplier: float = 1.0
def __post_init__(self):
if self.speed_multiplier == 0:
self.speed_multiplier = 1.0
class TimingController:
"""Manages simulation timing, TPS, and pause states."""
def __init__(self, default_tps: float = 40.0, event_bus: Optional[EventBus] = None):
self.default_tps = default_tps
self.state = TimingState(tps=default_tps)
self.event_bus = event_bus
# Timing variables
self.last_tick_time = time.perf_counter()
self.last_tps_time = time.perf_counter()
self.tick_counter = 0
self.total_ticks = 0
self.actual_tps = 0
# Sprint mode specific
self.sprint_start_time = None
def set_tps(self, tps: float):
"""Set target TPS."""
if tps > 0:
self.state.tps = max(1.0, min(1000.0, tps))
self.state.speed_multiplier = self.state.tps / self.default_tps
self._notify_state_change()
def set_speed_multiplier(self, multiplier: float):
"""Set speed multiplier for simulation."""
if multiplier > 0:
self.state.speed_multiplier = max(0.1, min(10.0, multiplier))
self.state.tps = self.default_tps * self.state.speed_multiplier
self._notify_state_change()
def toggle_pause(self):
"""Toggle pause state."""
self.state.is_paused = not self.state.is_paused
if self.state.is_paused:
# Reset timing when paused
self.last_tick_time = time.perf_counter()
self.last_tps_time = time.perf_counter()
self._notify_state_change()
def set_pause(self, paused: bool):
"""Set pause state directly."""
if self.state.is_paused != paused:
self.toggle_pause()
def toggle_sprint_mode(self):
"""Toggle sprint mode."""
self.state.sprint_mode = not self.state.sprint_mode
if self.state.sprint_mode:
self.sprint_start_time = time.perf_counter()
else:
self.sprint_start_time = None
self._notify_state_change()
def set_sprint_mode(self, enabled: bool):
"""Set sprint mode directly."""
if self.state.sprint_mode != enabled:
self.toggle_sprint_mode()
def should_tick(self) -> bool:
"""Check if simulation should tick based on timing."""
if self.state.is_paused:
return False
if self.state.sprint_mode:
return True
tick_interval = 1.0 / self.state.tps
current_time = time.perf_counter()
return current_time - self.last_tick_time >= tick_interval
def update_timing(self):
"""Update timing variables after a tick."""
current_time = time.perf_counter()
# Update tick counters first
self.tick_counter += 1
self.total_ticks += 1
if self.state.sprint_mode and self.sprint_start_time:
# In sprint mode, calculate TPS based on total sprint duration
sprint_duration = current_time - self.sprint_start_time
if sprint_duration > 0:
self.actual_tps = self.tick_counter / sprint_duration
else:
self.actual_tps = 0
else:
# Normal mode TPS calculation using sliding window for more responsive display
elapsed_time = current_time - self.last_tps_time
# Update TPS more frequently for responsive display
if elapsed_time >= 0.5: # Update every 500ms instead of 1 second
# Calculate actual TPS based on elapsed time
self.actual_tps = self.tick_counter / elapsed_time
# Reset for next measurement period
self.last_tps_time = current_time
self.tick_counter = 0
# Update last tick time for next tick calculation - advance by tick interval
if not self.state.sprint_mode:
tick_interval = 1.0 / self.state.tps
self.last_tick_time += tick_interval
self._notify_timing_update()
def get_display_tps(self) -> int:
"""Get TPS rounded to nearest whole number for display."""
return round(self.actual_tps)
def reset_counters(self):
"""Reset timing counters."""
self.tick_counter = 0
self.total_ticks = 0
self.actual_tps = 0
self.last_tick_time = time.perf_counter()
self.last_tps_time = time.perf_counter()
if self.state.sprint_mode:
self.sprint_start_time = time.perf_counter()
def get_tick_interval(self) -> float:
"""Get current tick interval in seconds."""
return 1.0 / self.state.tps if self.state.tps > 0 else 1.0
def get_current_speed_display(self) -> str:
"""Get current speed display string."""
if self.state.sprint_mode:
return "Sprint"
elif self.state.is_paused:
return "Paused"
elif self.state.speed_multiplier == 1.0:
return "1x"
elif self.state.speed_multiplier in [0.5, 2.0, 4.0, 8.0]:
return f"{self.state.speed_multiplier}x"
else:
return f"{self.state.speed_multiplier:.1f}x"
def _notify_state_change(self):
"""Notify subscribers of state change."""
if self.event_bus:
event = Event(
type=EventType.SIMULATION_STATE_CHANGED,
data={
'timing_state': self.state,
'tps': self.state.tps,
'is_paused': self.state.is_paused,
'sprint_mode': self.state.sprint_mode,
'speed_multiplier': self.state.speed_multiplier
}
)
self.event_bus.publish(event)
def _notify_timing_update(self):
"""Notify subscribers of timing update."""
if self.event_bus:
event = Event(
type=EventType.TIMING_UPDATE,
data={
'actual_tps': self.actual_tps,
'total_ticks': self.total_ticks,
'tick_counter': self.tick_counter
}
)
self.event_bus.publish(event)

5
engines/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""Simulation engines for different modes."""
from .headless_engine import HeadlessSimulationEngine, HeadlessConfig
__all__ = ['HeadlessSimulationEngine', 'HeadlessConfig']

274
engines/headless_engine.py Normal file
View File

@ -0,0 +1,274 @@
"""Headless simulation engine for running simulations without UI."""
import time
import signal
import sys
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from core.simulation_core import SimulationCore, SimulationConfig
from core.event_bus import EventBus
from output import MetricsCollector, EntityCollector, EvolutionCollector
from output.formatters.json_formatter import JSONFormatter
from output.formatters.csv_formatter import CSVFormatter
from output.writers.file_writer import FileWriter
@dataclass
class HeadlessConfig:
"""Configuration for headless simulation engine."""
simulation: SimulationConfig
max_ticks: Optional[int] = None
max_duration: Optional[float] = None # seconds
output_dir: str = "simulation_output"
enable_metrics: bool = True
enable_entities: bool = True
enable_evolution: bool = True
metrics_interval: int = 100
entities_interval: int = 1000
evolution_interval: int = 1000
output_formats: List[str] = None # ['json', 'csv']
real_time: bool = False # Whether to run in real-time or as fast as possible
def __post_init__(self):
if self.output_formats is None:
self.output_formats = ['json']
class HeadlessSimulationEngine:
"""Headless simulation engine with data collection capabilities."""
def __init__(self, config: HeadlessConfig):
self.config = config
self.event_bus = EventBus()
self.simulation_core = SimulationCore(config.simulation, self.event_bus)
self.file_writer = FileWriter(config.output_dir)
self.formatters = self._create_formatters()
self.collectors = self._create_collectors()
# Runtime state
self.running = False
self.start_time = None
self.tick_data = {}
self.batch_data = {
'metrics': [],
'entities': [],
'evolution': []
}
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _create_formatters(self) -> Dict[str, Any]:
"""Create data formatters."""
formatters = {}
if 'json' in self.config.output_formats:
formatters['json'] = JSONFormatter()
if 'csv' in self.config.output_formats:
formatters['csv'] = CSVFormatter()
return formatters
def _create_collectors(self) -> Dict[str, Any]:
"""Create data collectors."""
collectors = {}
if self.config.enable_metrics:
collectors['metrics'] = MetricsCollector(self.config.metrics_interval)
if self.config.enable_entities:
collectors['entities'] = EntityCollector(
self.config.entities_interval,
include_cells=True,
include_food=False
)
if self.config.enable_evolution:
collectors['evolution'] = EvolutionCollector(self.config.evolution_interval)
return collectors
def run(self) -> Dict[str, Any]:
"""Run the headless simulation."""
# Determine if we should run at max speed
max_speed_mode = not self.config.real_time and self.config.simulation.default_tps >= 1000
print(f"Starting headless simulation...")
print(f"Output directory: {self.config.output_dir}")
print(f"Max ticks: {self.config.max_ticks or 'unlimited'}")
print(f"Max duration: {self.config.max_duration or 'unlimited'} seconds")
print(f"Real-time mode: {self.config.real_time}")
print(f"Speed mode: {'Maximum speed' if max_speed_mode else f'{self.config.simulation.default_tps} TPS'}")
print(f"Output formats: {', '.join(self.config.output_formats)}")
print(f"Collectors: {', '.join(self.collectors.keys())}")
print()
self.running = True
self.start_time = time.time()
self.simulation_core.start()
# Enable sprint mode for maximum speed if not real-time mode
if max_speed_mode:
self.simulation_core.timing.set_sprint_mode(True)
print("Running at maximum speed (sprint mode enabled)")
last_batch_time = time.time()
batch_interval = 5.0 # Write batch data every 5 seconds
try:
while self.running:
# Check termination conditions
if self._should_terminate():
break
# Update simulation
if max_speed_mode:
# In max speed mode, update as fast as possible
self.simulation_core.update(0.0)
else:
# Normal timing-based updates
self.simulation_core.update(0.016) # ~60 FPS equivalent
# Collect data
self._collect_data()
# Write batch data periodically
if time.time() - last_batch_time >= batch_interval:
self._write_batch_data()
last_batch_time = time.time()
# Real-time delay if needed
if self.config.real_time:
time.sleep(0.016) # ~60 FPS
except KeyboardInterrupt:
print("\nSimulation interrupted by user")
except Exception as e:
print(f"Simulation error: {e}")
import traceback
traceback.print_exc()
finally:
self._finalize()
return self._get_summary()
def _should_terminate(self) -> bool:
"""Check if simulation should terminate."""
# Check max ticks
if self.config.max_ticks and self.simulation_core.state.total_ticks >= self.config.max_ticks:
print(f"Reached max ticks: {self.config.max_ticks}")
return True
# Check max duration
if self.config.max_duration:
elapsed = time.time() - self.start_time
if elapsed >= self.config.max_duration:
print(f"Reached max duration: {self.config.max_duration} seconds")
return True
return False
def _collect_data(self):
"""Collect data from all collectors."""
for collector_name, collector in self.collectors.items():
data_list = collector.update(self.simulation_core)
for data in data_list:
self.batch_data[collector_name].append(data)
def _write_batch_data(self):
"""Write collected data to files."""
if not self.file_writer.is_ready():
return
for collector_name, data_list in self.batch_data.items():
if not data_list:
continue
for format_name, formatter in self.formatters.items():
# Group data by tick and write one file per tick
data_by_tick = {}
for data in data_list:
tick = data.get('tick_count', data.get('tick', self.simulation_core.state.total_ticks))
if tick not in data_by_tick:
data_by_tick[tick] = []
data_by_tick[tick].append(data)
# Write one file per tick for this collector
for tick, tick_data in data_by_tick.items():
filename = f"{collector_name}_tick{tick}.{formatter.get_file_extension()}"
# If multiple data items for same tick, combine them or write the latest one
combined_data = tick_data[-1] if len(tick_data) == 1 else {
'timestamp': tick_data[0].get('timestamp'),
'tick_count': tick,
'collection_type': collector_name,
'multiple_entries': len(tick_data),
'data': tick_data
}
formatted_data = formatter.format(combined_data)
self.file_writer.write(formatted_data, filename)
# Clear written data
data_list.clear()
print(f"Wrote batch data at tick {self.simulation_core.state.total_ticks}")
def _finalize(self):
"""Finalize simulation and write remaining data."""
print("Finalizing simulation...")
# Write any remaining data
self._write_batch_data()
# Write final summary
summary = self._get_summary()
if 'json' in self.formatters:
summary_data = self.formatters['json'].format(summary)
self.file_writer.write(summary_data, "simulation_summary.json")
# Stop simulation
self.simulation_core.stop()
self.file_writer.close()
print("Simulation completed")
def _get_summary(self) -> Dict[str, Any]:
"""Get simulation summary."""
duration = time.time() - self.start_time if self.start_time else 0
world_state = self.simulation_core.get_world_state()
return {
'simulation_config': {
'grid_width': self.config.simulation.grid_width,
'grid_height': self.config.simulation.grid_height,
'initial_cells': self.config.simulation.initial_cells,
'initial_food': self.config.simulation.initial_food,
'default_tps': self.config.simulation.default_tps
},
'runtime': {
'duration_seconds': duration,
'total_ticks': self.simulation_core.state.total_ticks,
'average_tps': self.simulation_core.state.total_ticks / duration if duration > 0 else 0,
'final_actual_tps': self.simulation_core.state.actual_tps
},
'final_state': world_state,
'data_collection': {
'collectors_used': list(self.collectors.keys()),
'output_formats': self.config.output_formats,
'output_directory': self.config.output_dir
}
}
def _signal_handler(self, signum, frame):
"""Handle shutdown signals."""
print(f"\nReceived signal {signum}, shutting down gracefully...")
self.running = False
def get_real_time_status(self) -> Dict[str, Any]:
"""Get current simulation status (useful for monitoring)."""
return {
'running': self.running,
'ticks': self.simulation_core.state.total_ticks,
'tps': self.simulation_core.state.actual_tps,
'duration': time.time() - self.start_time if self.start_time else 0,
'world_state': self.simulation_core.get_world_state()
}

3
entry_points/__init__.py Normal file
View File

@ -0,0 +1,3 @@
"""Entry points for different simulation modes."""
__all__ = []

View File

@ -0,0 +1,146 @@
#!/usr/bin/env python3
"""Headless simulation entry point - runs simulations without UI."""
import sys
import argparse
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from config import ConfigLoader, HeadlessConfig
from engines import HeadlessSimulationEngine, HeadlessConfig as EngineHeadlessConfig
def main():
"""Main entry point for headless simulation."""
parser = argparse.ArgumentParser(description="Run headless simulation")
parser.add_argument(
"--config", "-c",
type=str,
help="Path to configuration file (JSON/YAML)",
default=None
)
parser.add_argument(
"--output-dir", "-o",
type=str,
help="Output directory for simulation data",
default="simulation_output"
)
parser.add_argument(
"--max-ticks", "-t",
type=int,
help="Maximum number of ticks to run",
default=None
)
parser.add_argument(
"--max-duration", "-d",
type=float,
help="Maximum duration in seconds",
default=None
)
parser.add_argument(
"--tps",
type=float,
help="Target TPS for simulation (default: unlimited speed)",
default=None
)
parser.add_argument(
"--real-time",
action="store_true",
help="Run in real-time mode (instead of as fast as possible)"
)
parser.add_argument(
"--collect-every-tick",
action="store_true",
help="Collect data on every tick instead of at intervals"
)
parser.add_argument(
"--create-sample-configs",
action="store_true",
help="Create sample configuration files and exit"
)
args = parser.parse_args()
# Create sample configs if requested
if args.create_sample_configs:
ConfigLoader.create_sample_configs()
return
# Load configuration
try:
if args.config:
headless_config = ConfigLoader.load_headless_config(args.config)
else:
headless_config = ConfigLoader.get_default_headless_config()
# Override config with command line arguments
if args.max_ticks:
headless_config.max_ticks = args.max_ticks
if args.max_duration:
headless_config.max_duration = args.max_duration
if args.tps is not None:
headless_config.simulation.default_tps = args.tps
else:
# No TPS specified - run at maximum speed
headless_config.simulation.default_tps = 999999999.0
if args.output_dir:
headless_config.output.directory = args.output_dir
if args.real_time:
headless_config.output.real_time = True
if args.collect_every_tick:
# Set all collection intervals to 1 for every-tick collection
headless_config.output.metrics_interval = 1
headless_config.output.entities_interval = 1
headless_config.output.evolution_interval = 1
except Exception as e:
print(f"Error loading configuration: {e}")
sys.exit(1)
# Create engine configuration
engine_config = EngineHeadlessConfig(
simulation=headless_config.simulation,
max_ticks=headless_config.max_ticks,
max_duration=headless_config.max_duration,
output_dir=headless_config.output.directory,
enable_metrics=headless_config.output.collect_metrics,
enable_entities=headless_config.output.collect_entities,
enable_evolution=headless_config.output.collect_evolution,
metrics_interval=headless_config.output.metrics_interval,
entities_interval=headless_config.output.entities_interval,
evolution_interval=headless_config.output.evolution_interval,
output_formats=headless_config.output.formats,
real_time=headless_config.output.real_time
)
# Create and run simulation
try:
print("Starting headless simulation...")
engine = HeadlessSimulationEngine(engine_config)
summary = engine.run()
# Print summary
print("\n" + "="*50)
print("SIMULATION SUMMARY")
print("="*50)
print(f"Total ticks: {summary['runtime']['total_ticks']}")
print(f"Duration: {summary['runtime']['duration_seconds']:.2f} seconds")
print(f"Average TPS: {summary['runtime']['average_tps']:.2f}")
print(f"Final entity counts: {summary['final_state']['entity_counts']}")
print(f"Output directory: {summary['data_collection']['output_directory']}")
except KeyboardInterrupt:
print("\nSimulation interrupted by user")
sys.exit(0)
except Exception as e:
print(f"Simulation error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,66 @@
#!/usr/bin/env python3
"""Interactive simulation entry point - runs simulation with UI."""
import sys
import argparse
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from config import ConfigLoader, InteractiveConfig
from core.simulation_engine import SimulationEngine
def main():
"""Main entry point for interactive simulation."""
parser = argparse.ArgumentParser(description="Run interactive simulation with UI")
parser.add_argument(
"--config", "-c",
type=str,
help="Path to configuration file (JSON/YAML)",
default=None
)
parser.add_argument(
"--create-sample-configs",
action="store_true",
help="Create sample configuration files and exit"
)
args = parser.parse_args()
# Create sample configs if requested
if args.create_sample_configs:
ConfigLoader.create_sample_configs()
return
# Load configuration
try:
if args.config:
config = ConfigLoader.load_interactive_config(args.config)
else:
config = ConfigLoader.get_default_interactive_config()
except Exception as e:
print(f"Error loading configuration: {e}")
sys.exit(1)
# Run simulation
try:
print("Starting interactive simulation...")
engine = SimulationEngine()
engine.run()
except KeyboardInterrupt:
print("\nSimulation interrupted by user")
sys.exit(0)
except Exception as e:
print(f"Simulation error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()

17
output/__init__.py Normal file
View File

@ -0,0 +1,17 @@
"""Data collection and output system for headless simulations."""
from .collectors.metrics_collector import MetricsCollector
from .collectors.entity_collector import EntityCollector
from .collectors.evolution_collector import EvolutionCollector
from .formatters.json_formatter import JSONFormatter
from .formatters.csv_formatter import CSVFormatter
from .writers.file_writer import FileWriter
__all__ = [
'MetricsCollector',
'EntityCollector',
'EvolutionCollector',
'JSONFormatter',
'CSVFormatter',
'FileWriter'
]

View File

@ -0,0 +1,44 @@
"""Base data collector interface."""
from abc import ABC, abstractmethod
from typing import Any, Dict, List
from ..formatters.base_formatter import BaseFormatter
from ..writers.base_writer import BaseWriter
class BaseCollector(ABC):
"""Base class for data collectors."""
def __init__(self, collection_interval: int = 100):
self.collection_interval = collection_interval
self.last_collected_tick = -1
self.data_buffer: List[Dict[str, Any]] = []
@abstractmethod
def collect(self, simulation_core) -> Dict[str, Any]:
"""Collect data from simulation core."""
pass
def should_collect(self, current_tick: int) -> bool:
"""Check if data should be collected on current tick."""
return current_tick - self.last_collected_tick >= self.collection_interval
def update(self, simulation_core) -> List[Dict[str, Any]]:
"""Update collector and return collected data if interval reached."""
current_tick = simulation_core.state.total_ticks
if self.should_collect(current_tick):
data = self.collect(simulation_core)
self.data_buffer.append(data)
self.last_collected_tick = current_tick
return [data]
return []
def get_buffered_data(self) -> List[Dict[str, Any]]:
"""Get all buffered data and clear buffer."""
data = self.data_buffer.copy()
self.data_buffer.clear()
return data
def clear_buffer(self):
"""Clear the data buffer."""
self.data_buffer.clear()

View File

@ -0,0 +1,72 @@
"""Entity state collector for detailed entity tracking."""
from typing import Dict, Any, List
from .base_collector import BaseCollector
from world.objects import DefaultCell
class EntityCollector(BaseCollector):
"""Collects detailed entity state information."""
def __init__(self, collection_interval: int = 1000, include_cells: bool = True, include_food: bool = False):
super().__init__(collection_interval)
self.include_cells = include_cells
self.include_food = include_food
def collect(self, simulation_core) -> Dict[str, Any]:
"""Collect entity states from simulation core."""
world_state = simulation_core.get_world_state()
entities = []
for entity_data in simulation_core.get_entity_states():
entity_type = entity_data['type']
# Filter by entity type based on configuration
if entity_type == 'cell' and not self.include_cells:
continue
elif entity_type == 'food' and not self.include_food:
continue
entities.append(entity_data)
# Calculate additional statistics for cells
cell_stats = {}
if self.include_cells:
cells = [e for e in entities if e['type'] == 'cell']
if cells:
energies = [c['energy'] for c in cells]
ages = [c['age'] for c in cells]
generations = [c['generation'] for c in cells]
cell_stats = {
'avg_energy': sum(energies) / len(energies),
'max_energy': max(energies),
'min_energy': min(energies),
'avg_age': sum(ages) / len(ages),
'max_age': max(ages),
'avg_generation': sum(generations) / len(generations),
'max_generation': max(generations)
}
# Calculate food statistics
food_stats = {}
if self.include_food:
foods = [e for e in entities if e['type'] == 'food']
if foods:
decays = [f['decay'] for f in foods]
food_stats = {
'avg_decay': sum(decays) / len(decays),
'max_decay': max(decays),
'min_decay': min(decays),
'fresh_food': len([f for f in foods if f['decay'] < f['max_decay'] * 0.5])
}
return {
'timestamp': simulation_core.timing.last_tick_time,
'tick_count': world_state['tick_count'],
'entity_count': len(entities),
'entities': entities,
'cell_statistics': cell_stats,
'food_statistics': food_stats,
'collection_type': 'entities'
}

View File

@ -0,0 +1,100 @@
"""Evolution data collector for tracking neural network changes."""
from typing import Dict, Any, List
from .base_collector import BaseCollector
from world.objects import DefaultCell
import numpy as np
class EvolutionCollector(BaseCollector):
"""Collects evolution and neural network data."""
def __init__(self, collection_interval: int = 1000):
super().__init__(collection_interval)
def collect(self, simulation_core) -> Dict[str, Any]:
"""Collect evolution data from simulation core."""
world_state = simulation_core.get_world_state()
cells_data = []
network_architectures = {}
total_weights = []
layer_sizes_common = {}
for entity_data in simulation_core.get_entity_states():
if entity_data['type'] == 'cell':
cells_data.append(entity_data)
# Track neural network architecture
nn_data = entity_data['neural_network']
layers_key = str(nn_data['layer_sizes']) # Convert to string for JSON compatibility
if layers_key not in network_architectures:
network_architectures[layers_key] = 0
network_architectures[layers_key] += 1
# Use layer count as a proxy for complexity
total_weights.append(len(nn_data['layer_sizes']))
# Track layer size frequencies
for size in nn_data['layer_sizes']:
if size not in layer_sizes_common:
layer_sizes_common[size] = 0
layer_sizes_common[size] += 1
# Calculate evolution statistics
evolution_stats = {}
if cells_data:
energies = [c['energy'] for c in cells_data]
ages = [c['age'] for c in cells_data]
generations = [c['generation'] for c in cells_data]
evolution_stats = {
'cell_count': len(cells_data),
'energy_distribution': {
'mean': np.mean(energies),
'std': np.std(energies),
'min': min(energies),
'max': max(energies),
'median': np.median(energies)
},
'age_distribution': {
'mean': np.mean(ages),
'std': np.std(ages),
'min': min(ages),
'max': max(ages),
'median': np.median(ages)
},
'generation_distribution': {
'mean': np.mean(generations),
'std': np.std(generations),
'min': min(generations),
'max': max(generations),
'median': np.median(generations)
}
}
# Network architecture statistics
network_stats = {}
if total_weights:
network_stats = {
'architecture_diversity': len(network_architectures),
'most_common_architecture': max(network_architectures.items(), key=lambda x: x[1]) if network_architectures else None,
'complexity_distribution': {
'mean': np.mean(total_weights),
'std': np.std(total_weights),
'min': min(total_weights),
'max': max(total_weights)
},
'layer_size_diversity': len(layer_sizes_common),
'most_common_layer_sizes': sorted(layer_sizes_common.items(), key=lambda x: x[1], reverse=True)[:5]
}
return {
'timestamp': simulation_core.timing.last_tick_time,
'tick_count': world_state['tick_count'],
'evolution_statistics': evolution_stats,
'network_statistics': network_stats,
'network_architectures': dict(network_architectures),
'collection_type': 'evolution'
}

View File

@ -0,0 +1,28 @@
"""Basic metrics collector for simulation statistics."""
from typing import Dict, Any
from .base_collector import BaseCollector
class MetricsCollector(BaseCollector):
"""Collects basic simulation metrics."""
def __init__(self, collection_interval: int = 100):
super().__init__(collection_interval)
def collect(self, simulation_core) -> Dict[str, Any]:
"""Collect basic metrics from simulation core."""
world_state = simulation_core.get_world_state()
return {
'timestamp': simulation_core.timing.last_tick_time,
'tick_count': world_state['tick_count'],
'actual_tps': world_state['actual_tps'],
'target_tps': world_state['target_tps'],
'speed_multiplier': world_state['speed_multiplier'],
'is_paused': world_state['is_paused'],
'sprint_mode': world_state['sprint_mode'],
'world_buffer': world_state['world_buffer'],
'entity_counts': world_state['entity_counts'],
'collection_type': 'metrics'
}

View File

@ -0,0 +1,18 @@
"""Base formatter interface for output data."""
from abc import ABC, abstractmethod
from typing import Any
class BaseFormatter(ABC):
"""Base class for data formatters."""
@abstractmethod
def format(self, data: Any) -> str:
"""Format data for output."""
pass
@abstractmethod
def get_file_extension(self) -> str:
"""Get file extension for this format."""
pass

View File

@ -0,0 +1,83 @@
"""CSV formatter for tabular output data."""
import csv
import io
from typing import Any, List, Dict
from .base_formatter import BaseFormatter
class CSVFormatter(BaseFormatter):
"""Formats data as CSV."""
def __init__(self, flatten_nested: bool = True):
self.flatten_nested = flatten_nested
def format(self, data: Any) -> str:
"""Format data as CSV string."""
if isinstance(data, list):
return self._format_list(data)
elif isinstance(data, dict):
return self._format_dict(data)
else:
# Single value
return str(data)
def _format_list(self, data: List[Dict[str, Any]]) -> str:
"""Format list of dictionaries as CSV."""
if not data:
return ""
# Flatten nested dictionaries if requested
processed_data = []
for item in data:
if self.flatten_nested:
processed_data.append(self._flatten_dict(item))
else:
processed_data.append(item)
# Get all field names from all items
fieldnames = set()
for item in processed_data:
fieldnames.update(item.keys())
fieldnames = sorted(fieldnames)
# Create CSV
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(processed_data)
return output.getvalue()
def _format_dict(self, data: Dict[str, Any]) -> str:
"""Format single dictionary as CSV."""
processed_data = self._flatten_dict(data) if self.flatten_nested else data
fieldnames = sorted(processed_data.keys())
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerow(processed_data)
return output.getvalue()
def _flatten_dict(self, data: Dict[str, Any], parent_key: str = '', sep: str = '_') -> Dict[str, Any]:
"""Flatten nested dictionaries."""
items = []
for key, value in data.items():
new_key = f"{parent_key}{sep}{key}" if parent_key else key
if isinstance(value, dict):
items.extend(self._flatten_dict(value, new_key, sep).items())
elif isinstance(value, list):
# Convert lists to strings or handle each element
if value and isinstance(value[0], (int, float, str)):
items.append((new_key, ','.join(map(str, value))))
else:
items.append((new_key, str(value)))
else:
items.append((new_key, value))
return dict(items)
def get_file_extension(self) -> str:
"""Get file extension for CSV format."""
return "csv"

View File

@ -0,0 +1,20 @@
"""JSON formatter for output data."""
import json
from typing import Any
from .base_formatter import BaseFormatter
class JSONFormatter(BaseFormatter):
"""Formats data as JSON."""
def __init__(self, indent: int = 2):
self.indent = indent
def format(self, data: Any) -> str:
"""Format data as JSON string."""
return json.dumps(data, indent=self.indent, default=str)
def get_file_extension(self) -> str:
"""Get file extension for JSON format."""
return "json"

View File

@ -0,0 +1,23 @@
"""Base writer interface for output data."""
from abc import ABC, abstractmethod
from typing import Any
class BaseWriter(ABC):
"""Base class for data writers."""
@abstractmethod
def write(self, data: Any) -> bool:
"""Write data to output destination."""
pass
@abstractmethod
def close(self):
"""Close writer and cleanup resources."""
pass
@abstractmethod
def is_ready(self) -> bool:
"""Check if writer is ready for writing."""
pass

View File

@ -0,0 +1,140 @@
"""File writer for output data."""
import os
from pathlib import Path
from typing import Any, List, Optional
from .base_writer import BaseWriter
class FileWriter(BaseWriter):
"""Writes data to files."""
def __init__(self, output_dir: str = "simulation_output", create_dirs: bool = True):
self.output_dir = Path(output_dir)
self.create_dirs = create_dirs
self.file_handles = {}
self.ready = False
if self.create_dirs:
self.output_dir.mkdir(parents=True, exist_ok=True)
self.ready = self.output_dir.exists()
def write(self, data: Any, filename: Optional[str] = None, data_type: str = "data") -> bool:
"""Write data to file."""
if not self.ready:
return False
try:
if filename is None:
# Generate filename based on data type and timestamp
import time
timestamp = int(time.time())
filename = f"{data_type}_{timestamp}.txt"
filepath = self.output_dir / filename
# Handle different data types
if isinstance(data, str):
# Already formatted data
with open(filepath, 'w', encoding='utf-8') as f:
f.write(data)
elif isinstance(data, (list, dict)):
# Convert to string representation
with open(filepath, 'w', encoding='utf-8') as f:
f.write(str(data))
else:
# Single value
with open(filepath, 'w', encoding='utf-8') as f:
f.write(str(data))
return True
except Exception as e:
print(f"Error writing to file {filename}: {e}")
return False
def write_batch(self, data_list: List[Any], filename: str, data_type: str = "batch") -> bool:
"""Write multiple data items to a single file."""
if not self.ready or not data_list:
return False
try:
filepath = self.output_dir / filename
# Combine all data items
if isinstance(data_list[0], str):
# Already formatted strings
content = '\n'.join(data_list)
else:
# Convert to string representation
content = '\n'.join(str(item) for item in data_list)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
return True
except Exception as e:
print(f"Error writing batch to file {filename}: {e}")
return False
def append(self, data: Any, filename: str) -> bool:
"""Append data to existing file."""
if not self.ready:
return False
try:
filepath = self.output_dir / filename
with open(filepath, 'a', encoding='utf-8') as f:
f.write(str(data))
f.write('\n') # Add newline
return True
except Exception as e:
print(f"Error appending to file {filename}: {e}")
return False
def get_file_path(self, filename: str) -> Path:
"""Get full path for a filename."""
return self.output_dir / filename
def list_files(self, pattern: str = "*") -> List[str]:
"""List files in output directory."""
if not self.ready:
return []
try:
return [f.name for f in self.output_dir.glob(pattern) if f.is_file()]
except Exception:
return []
def delete_file(self, filename: str) -> bool:
"""Delete a file."""
if not self.ready:
return False
try:
filepath = self.output_dir / filename
if filepath.exists():
filepath.unlink()
return True
return False
except Exception as e:
print(f"Error deleting file {filename}: {e}")
return False
def close(self):
"""Close file handles and cleanup."""
for handle in self.file_handles.values():
try:
handle.close()
except:
pass
self.file_handles.clear()
def is_ready(self) -> bool:
"""Check if writer is ready for writing."""
return self.ready

View File

@ -16,4 +16,5 @@ dependencies = [
dev = [
"psutil>=7.0.0",
"ruff>=0.11.12",
"snakeviz>=2.2.2",
]

370
ui/hud.py
View File

@ -6,12 +6,18 @@ import pygame_gui
from config.constants import *
from world.base.brain import CellBrain, FlexibleNeuralNetwork
from world.objects import DefaultCell
from pygame_gui.elements import UIPanel
from pygame_gui.elements import UIPanel, UIButton, UITextEntryLine, UILabel
import math
DARK_GRAY = (40, 40, 40)
DARKER_GRAY = (25, 25, 25)
# Panel visibility constants
SHOW_CONTROL_BAR = True
SHOW_INSPECTOR_PANEL = False
SHOW_PROPERTIES_PANEL = True
SHOW_CONSOLE_PANEL = False
class HUD:
def __init__(self, ui_manager, screen_width=SCREEN_WIDTH, screen_height=SCREEN_HEIGHT):
self.font = pygame.font.Font("freesansbold.ttf", FONT_SIZE)
@ -29,90 +35,307 @@ class HUD:
self.splitter_thickness = 6
self.dragging_splitter = None
# Simulation control elements
self.play_pause_button = None
self.step_button = None
self.sprint_button = None
self.speed_buttons = {}
self.custom_tps_entry = None
self.tps_label = None
self._create_panels()
self._create_simulation_controls()
def _create_panels(self):
self.panels = []
# Top control bar
self.control_bar = UIPanel(
relative_rect=pygame.Rect(0, 0, self.screen_width, self.control_bar_height),
manager=self.manager,
object_id="#control_bar",
)
if SHOW_CONTROL_BAR:
self.control_bar = UIPanel(
relative_rect=pygame.Rect(0, 0, self.screen_width, self.control_bar_height),
manager=self.manager,
object_id="#control_bar",
)
self.panels.append(self.control_bar)
else:
self.control_bar = None
# Left inspector
self.inspector_panel = UIPanel(
relative_rect=pygame.Rect(
0, self.control_bar_height,
self.inspector_width,
self.screen_height - self.control_bar_height
),
manager=self.manager,
object_id="#inspector_panel",
)
if SHOW_INSPECTOR_PANEL:
self.inspector_panel = UIPanel(
relative_rect=pygame.Rect(
0, self.control_bar_height if SHOW_CONTROL_BAR else 0,
self.inspector_width,
self.screen_height - (self.control_bar_height if SHOW_CONTROL_BAR else 0)
),
manager=self.manager,
object_id="#inspector_panel",
)
self.panels.append(self.inspector_panel)
else:
self.inspector_panel = None
# Right properties
self.properties_panel = UIPanel(
relative_rect=pygame.Rect(
self.screen_width - self.properties_width,
self.control_bar_height,
self.properties_width,
self.screen_height - self.control_bar_height
),
manager=self.manager,
object_id="#properties_panel",
)
if SHOW_PROPERTIES_PANEL:
self.properties_panel = UIPanel(
relative_rect=pygame.Rect(
self.screen_width - self.properties_width,
self.control_bar_height if SHOW_CONTROL_BAR else 0,
self.properties_width,
self.screen_height - (self.control_bar_height if SHOW_CONTROL_BAR else 0)
),
manager=self.manager,
object_id="#properties_panel",
)
self.panels.append(self.properties_panel)
else:
self.properties_panel = None
# Bottom console
self.console_panel = UIPanel(
relative_rect=pygame.Rect(
self.inspector_width,
self.screen_height - self.console_height,
self.screen_width - self.inspector_width - self.properties_width,
self.console_height
),
if SHOW_CONSOLE_PANEL:
self.console_panel = UIPanel(
relative_rect=pygame.Rect(
self.inspector_width if SHOW_INSPECTOR_PANEL else 0,
self.screen_height - self.console_height,
self.screen_width - (self.inspector_width if SHOW_INSPECTOR_PANEL else 0) - (self.properties_width if SHOW_PROPERTIES_PANEL else 0),
self.console_height
),
manager=self.manager,
object_id="#console_panel",
)
self.panels.append(self.console_panel)
else:
self.console_panel = None
self.dragging_splitter = None
def _create_simulation_controls(self):
"""Create simulation control buttons in the control bar."""
if not self.control_bar:
return
# Button layout constants
button_width = 40
button_height = 32
button_spacing = 8
start_x = 20
start_y = 8
# Play/Pause button
self.play_pause_button = UIButton(
relative_rect=pygame.Rect(start_x, start_y, button_width, button_height),
text='>',
manager=self.manager,
object_id="#console_panel",
container=self.control_bar,
object_id="#play_pause_button"
)
self.panels = [
self.control_bar,
self.inspector_panel,
self.properties_panel,
self.console_panel
]
self.dragging_splitter = None
# Step forward button
step_x = start_x + button_width + button_spacing
self.step_button = UIButton(
relative_rect=pygame.Rect(step_x, start_y, button_width, button_height),
text='>>',
manager=self.manager,
container=self.control_bar,
object_id="#step_button"
)
# Sprint button
sprint_x = step_x + button_width + button_spacing + 5 # Extra spacing
self.sprint_button = UIButton(
relative_rect=pygame.Rect(sprint_x, start_y, button_width + 10, button_height),
text='>>|',
manager=self.manager,
container=self.control_bar,
object_id="#sprint_button"
)
# Speed control buttons
speed_labels = ["0.5x", "1x", "2x", "4x", "8x"]
speed_multipliers = [0.5, 1.0, 2.0, 4.0, 8.0]
speed_x = sprint_x + button_width + 10 + button_spacing + 10 # Extra spacing
for i, (label, multiplier) in enumerate(zip(speed_labels, speed_multipliers)):
button_x = speed_x + i * (button_width - 5 + button_spacing)
button = UIButton(
relative_rect=pygame.Rect(button_x, start_y, button_width - 5, button_height),
text=label,
manager=self.manager,
container=self.control_bar,
object_id=f"#speed_{int(multiplier*10)}x_button"
)
self.speed_buttons[multiplier] = button
# Custom TPS input
tps_x = speed_x + len(speed_labels) * (button_width - 5 + button_spacing) + button_spacing
self.custom_tps_entry = UITextEntryLine(
relative_rect=pygame.Rect(tps_x, start_y + 2, 50, button_height - 4),
manager=self.manager,
container=self.control_bar,
object_id="#custom_tps_entry"
)
self.custom_tps_entry.set_text(str(DEFAULT_TPS))
# TPS display label
tps_label_x = tps_x + 55
self.tps_label = UILabel(
relative_rect=pygame.Rect(tps_label_x, start_y + 4, 80, button_height - 8),
text='TPS: 40',
manager=self.manager,
container=self.control_bar,
object_id="#tps_label"
)
def get_viewport_rect(self):
# Returns the rect for the simulation viewport
x = self.inspector_width
y = self.control_bar_height
w = self.screen_width - self.inspector_width - self.properties_width
h = self.screen_height - self.control_bar_height - self.console_height
inspector_width = self.inspector_width if SHOW_INSPECTOR_PANEL and self.inspector_panel else 0
control_bar_height = self.control_bar_height if SHOW_CONTROL_BAR and self.control_bar else 0
properties_width = self.properties_width if SHOW_PROPERTIES_PANEL and self.properties_panel else 0
console_height = self.console_height if SHOW_CONSOLE_PANEL and self.console_panel else 0
x = inspector_width
y = control_bar_height
w = self.screen_width - inspector_width - properties_width
h = self.screen_height - control_bar_height - console_height
return pygame.Rect(x, y, w, h)
def update_layout(self, window_width, window_height):
self.screen_width = window_width
self.screen_height = window_height
control_bar_height = self.control_bar_height if SHOW_CONTROL_BAR and self.control_bar else 0
# Control bar (top)
self.control_bar.set_relative_position((0, 0))
self.control_bar.set_dimensions((self.screen_width, self.control_bar_height))
if self.control_bar:
self.control_bar.set_relative_position((0, 0))
self.control_bar.set_dimensions((self.screen_width, self.control_bar_height))
# Inspector panel (left) - goes all the way to the bottom
self.inspector_panel.set_relative_position((0, self.control_bar_height))
self.inspector_panel.set_dimensions((self.inspector_width, self.screen_height - self.control_bar_height))
# Inspector panel (left)
if self.inspector_panel:
self.inspector_panel.set_relative_position((0, control_bar_height))
self.inspector_panel.set_dimensions((self.inspector_width, self.screen_height - control_bar_height))
# Properties panel (right) - goes all the way to the bottom
self.properties_panel.set_relative_position(
(self.screen_width - self.properties_width, self.control_bar_height))
self.properties_panel.set_dimensions((self.properties_width, self.screen_height - self.control_bar_height))
# Properties panel (right)
if self.properties_panel:
self.properties_panel.set_relative_position(
(self.screen_width - self.properties_width, control_bar_height))
self.properties_panel.set_dimensions((self.properties_width, self.screen_height - control_bar_height))
# Console panel (bottom, spans between inspector and properties)
self.console_panel.set_relative_position((self.inspector_width, self.screen_height - self.console_height))
self.console_panel.set_dimensions(
(self.screen_width - self.inspector_width - self.properties_width, self.console_height))
if self.console_panel:
inspector_width = self.inspector_width if SHOW_INSPECTOR_PANEL and self.inspector_panel else 0
properties_width = self.properties_width if SHOW_PROPERTIES_PANEL and self.properties_panel else 0
self.console_panel.set_relative_position((inspector_width, self.screen_height - self.console_height))
self.console_panel.set_dimensions(
(self.screen_width - inspector_width - properties_width, self.console_height))
# Recreate simulation controls after layout change
if hasattr(self, 'play_pause_button'):
self._destroy_simulation_controls()
self._create_simulation_controls()
def _destroy_simulation_controls(self):
"""Destroy simulation control elements."""
if self.play_pause_button:
self.play_pause_button.kill()
if self.step_button:
self.step_button.kill()
if self.sprint_button:
self.sprint_button.kill()
for button in self.speed_buttons.values():
button.kill()
if self.custom_tps_entry:
self.custom_tps_entry.kill()
if self.tps_label:
self.tps_label.kill()
self.play_pause_button = None
self.step_button = None
self.sprint_button = None
self.speed_buttons = {}
self.custom_tps_entry = None
self.tps_label = None
def update_simulation_controls(self, simulation_core):
"""Update simulation control button states and displays based on simulation core state."""
if not self.play_pause_button:
return
timing_state = simulation_core.timing.state
# Update play/pause button
if timing_state.is_paused:
self.play_pause_button.set_text('>')
else:
self.play_pause_button.set_text('||')
# Update speed button highlights
speed_presets = {0.5: "0.5x", 1.0: "1x", 2.0: "2x", 4.0: "4x", 8.0: "8x"}
for multiplier, button in self.speed_buttons.items():
if (timing_state.speed_multiplier == multiplier and
not timing_state.is_paused and
not timing_state.sprint_mode):
# Active speed button - make text more prominent
button.set_text(f"[{speed_presets[multiplier]}]")
else:
# Normal button appearance
button.set_text(speed_presets[multiplier])
# Update sprint button appearance
if timing_state.sprint_mode:
self.sprint_button.set_text('')
else:
self.sprint_button.set_text('')
# Update TPS display
if self.tps_label:
if timing_state.sprint_mode:
self.tps_label.set_text(f"TPS: {timing_state.tps:.0f} (Sprint)")
else:
self.tps_label.set_text(f"TPS: {timing_state.tps:.0f}")
# Update custom TPS entry
if self.custom_tps_entry and not self.custom_tps_entry.is_focused:
self.custom_tps_entry.set_text(str(int(timing_state.tps)))
def process_event(self, event):
# Handle simulation control button events using ID matching
if event.type == pygame_gui.UI_BUTTON_START_PRESS:
object_id = str(event.ui_object_id)
if '#play_pause_button' in object_id:
return 'toggle_pause'
elif '#step_button' in object_id:
return 'step_forward'
elif '#sprint_button' in object_id:
return 'toggle_sprint'
elif '#speed_5x_button' in object_id: # 0.5x button
return 'set_speed', 0.5
elif '#speed_10x_button' in object_id: # 1x button
return 'set_speed', 1.0
elif '#speed_20x_button' in object_id: # 2x button
return 'set_speed', 2.0
elif '#speed_40x_button' in object_id: # 4x button
return 'set_speed', 4.0
elif '#speed_80x_button' in object_id: # 8x button
return 'set_speed', 8.0
elif event.type == pygame_gui.UI_TEXT_ENTRY_FINISHED:
object_id = str(event.ui_object_id)
print(f"Text entry finished: {object_id}, text: {event.text}")
if '#custom_tps_entry' in object_id:
try:
tps = float(event.text)
return 'set_custom_tps', tps
except ValueError:
# Invalid TPS value, reset to current TPS
return ('reset_tps_display',)
elif event.type == pygame_gui.UI_TEXT_ENTRY_CHANGED:
object_id = str(event.ui_object_id)
if '#custom_tps_entry' in object_id:
print(f"Text entry changed: {object_id}, text: {event.text}")
# Handle splitter dragging for resizing panels
if event.type == pygame.MOUSEBUTTONDOWN and event.button == 1:
mx, my = event.pos
@ -143,12 +366,17 @@ class HUD:
indicator_gap = 4 # Gap between indicator lines
indicator_count = 3 # Number of indicator lines
inspector_width = self.inspector_width if SHOW_INSPECTOR_PANEL and self.inspector_panel else 0
properties_width = self.properties_width if SHOW_PROPERTIES_PANEL and self.properties_panel else 0
console_height = self.console_height if SHOW_CONSOLE_PANEL and self.console_panel else 0
control_bar_height = self.control_bar_height if SHOW_CONTROL_BAR and self.control_bar else 0
# Vertical splitter (inspector/properties)
# Inspector/properties only if wide enough
if self.inspector_width > 0:
x = self.inspector_width - 2
y1 = self.control_bar_height
y2 = self.screen_height - self.console_height
if inspector_width > 0:
x = inspector_width - 2
y1 = control_bar_height
y2 = self.screen_height - console_height
# Draw indicator (horizontal lines) in the middle
mid_y = (y1 + y2) // 2
for i in range(indicator_count):
@ -160,10 +388,10 @@ class HUD:
2
)
if self.properties_width > 0:
x = self.screen_width - self.properties_width + 2
y1 = self.control_bar_height
y2 = self.screen_height - self.console_height
if properties_width > 0:
x = self.screen_width - properties_width + 2
y1 = control_bar_height
y2 = self.screen_height - console_height
mid_y = (y1 + y2) // 2
for i in range(indicator_count):
offset = (i - 1) * (indicator_gap + 1)
@ -175,10 +403,10 @@ class HUD:
)
# Horizontal splitter (console)
if self.console_height > 0:
y = self.screen_height - self.console_height + 2
x1 = self.inspector_width
x2 = self.screen_width - self.properties_width
if console_height > 0:
y = self.screen_height - console_height + 2
x1 = inspector_width
x2 = self.screen_width - properties_width
mid_x = (x1 + x2) // 2
for i in range(indicator_count):
offset = (i - 1) * (indicator_gap + 1)
@ -210,7 +438,8 @@ class HUD:
def render_tps(self, screen, actual_tps):
"""Render TPS in bottom right."""
tps_text = self.font.render(f"TPS: {actual_tps}", True, WHITE)
display_tps = round(actual_tps) # Round to nearest whole number
tps_text = self.font.render(f"TPS: {display_tps}", True, WHITE)
tps_rect = tps_text.get_rect()
tps_rect.bottomright = (self.screen_width - HUD_MARGIN, self.screen_height - HUD_MARGIN)
screen.blit(tps_text, tps_rect)
@ -644,7 +873,8 @@ class HUD:
def render_sprint_debug(self, screen, actual_tps, total_ticks, cell_count=None):
"""Render sprint debug info: header, TPS, and tick count."""
header = self.font.render("Sprinting...", True, (255, 200, 0))
tps_text = self.font.render(f"TPS: {actual_tps}", True, (255, 255, 255))
display_tps = round(actual_tps) # Round to nearest whole number
tps_text = self.font.render(f"TPS: {display_tps}", True, (255, 255, 255))
ticks_text = self.font.render(f"Ticks: {total_ticks}", True, (255, 255, 255))
cell_text = self.font.render(f"Cells: {cell_count}" if cell_count is not None else "Cells: N/A", True, (255, 255, 255))

33
uv.lock generated
View File

@ -55,6 +55,7 @@ dependencies = [
dev = [
{ name = "psutil" },
{ name = "ruff" },
{ name = "snakeviz" },
]
[package.metadata]
@ -71,6 +72,7 @@ requires-dist = [
dev = [
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "ruff", specifier = ">=0.11.12" },
{ name = "snakeviz", specifier = ">=2.2.2" },
]
[[package]]
@ -459,6 +461,37 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/44/42/d58086ec20f52d2b0140752ae54b355ea2be2ed46f914231136dd1effcc7/ruff-0.11.12-py3-none-win_arm64.whl", hash = "sha256:65194e37853158d368e333ba282217941029a28ea90913c67e558c611d04daa5", size = 10697770, upload-time = "2025-05-29T13:31:38.009Z" },
]
[[package]]
name = "snakeviz"
version = "2.2.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "tornado" },
]
sdist = { url = "https://files.pythonhosted.org/packages/04/06/82f56563b16d33c2586ac2615a3034a83a4ff1969b84c8d79339e5d07d73/snakeviz-2.2.2.tar.gz", hash = "sha256:08028c6f8e34a032ff14757a38424770abb8662fb2818985aeea0d9bc13a7d83", size = 182039, upload-time = "2024-11-09T22:03:58.99Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cd/f7/83b00cdf4f114f10750a18b64c27dc34636d0ac990ccac98282f5c0fbb43/snakeviz-2.2.2-py3-none-any.whl", hash = "sha256:77e7b9c82f6152edc330040319b97612351cd9b48c706434c535c2df31d10ac5", size = 183477, upload-time = "2024-11-09T22:03:57.049Z" },
]
[[package]]
name = "tornado"
version = "6.5.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/51/89/c72771c81d25d53fe33e3dca61c233b665b2780f21820ba6fd2c6793c12b/tornado-6.5.1.tar.gz", hash = "sha256:84ceece391e8eb9b2b95578db65e920d2a61070260594819589609ba9bc6308c", size = 509934, upload-time = "2025-05-22T18:15:38.788Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/77/89/f4532dee6843c9e0ebc4e28d4be04c67f54f60813e4bf73d595fe7567452/tornado-6.5.1-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:d50065ba7fd11d3bd41bcad0825227cc9a95154bad83239357094c36708001f7", size = 441948, upload-time = "2025-05-22T18:15:20.862Z" },
{ url = "https://files.pythonhosted.org/packages/15/9a/557406b62cffa395d18772e0cdcf03bed2fff03b374677348eef9f6a3792/tornado-6.5.1-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:9e9ca370f717997cb85606d074b0e5b247282cf5e2e1611568b8821afe0342d6", size = 440112, upload-time = "2025-05-22T18:15:22.591Z" },
{ url = "https://files.pythonhosted.org/packages/55/82/7721b7319013a3cf881f4dffa4f60ceff07b31b394e459984e7a36dc99ec/tornado-6.5.1-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b77e9dfa7ed69754a54c89d82ef746398be82f749df69c4d3abe75c4d1ff4888", size = 443672, upload-time = "2025-05-22T18:15:24.027Z" },
{ url = "https://files.pythonhosted.org/packages/7d/42/d11c4376e7d101171b94e03cef0cbce43e823ed6567ceda571f54cf6e3ce/tornado-6.5.1-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:253b76040ee3bab8bcf7ba9feb136436a3787208717a1fb9f2c16b744fba7331", size = 443019, upload-time = "2025-05-22T18:15:25.735Z" },
{ url = "https://files.pythonhosted.org/packages/7d/f7/0c48ba992d875521ac761e6e04b0a1750f8150ae42ea26df1852d6a98942/tornado-6.5.1-cp39-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:308473f4cc5a76227157cdf904de33ac268af770b2c5f05ca6c1161d82fdd95e", size = 443252, upload-time = "2025-05-22T18:15:27.499Z" },
{ url = "https://files.pythonhosted.org/packages/89/46/d8d7413d11987e316df4ad42e16023cd62666a3c0dfa1518ffa30b8df06c/tornado-6.5.1-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:caec6314ce8a81cf69bd89909f4b633b9f523834dc1a352021775d45e51d9401", size = 443930, upload-time = "2025-05-22T18:15:29.299Z" },
{ url = "https://files.pythonhosted.org/packages/78/b2/f8049221c96a06df89bed68260e8ca94beca5ea532ffc63b1175ad31f9cc/tornado-6.5.1-cp39-abi3-musllinux_1_2_i686.whl", hash = "sha256:13ce6e3396c24e2808774741331638ee6c2f50b114b97a55c5b442df65fd9692", size = 443351, upload-time = "2025-05-22T18:15:31.038Z" },
{ url = "https://files.pythonhosted.org/packages/76/ff/6a0079e65b326cc222a54720a748e04a4db246870c4da54ece4577bfa702/tornado-6.5.1-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:5cae6145f4cdf5ab24744526cc0f55a17d76f02c98f4cff9daa08ae9a217448a", size = 443328, upload-time = "2025-05-22T18:15:32.426Z" },
{ url = "https://files.pythonhosted.org/packages/49/18/e3f902a1d21f14035b5bc6246a8c0f51e0eef562ace3a2cea403c1fb7021/tornado-6.5.1-cp39-abi3-win32.whl", hash = "sha256:e0a36e1bc684dca10b1aa75a31df8bdfed656831489bc1e6a6ebed05dc1ec365", size = 444396, upload-time = "2025-05-22T18:15:34.205Z" },
{ url = "https://files.pythonhosted.org/packages/7b/09/6526e32bf1049ee7de3bebba81572673b19a2a8541f795d887e92af1a8bc/tornado-6.5.1-cp39-abi3-win_amd64.whl", hash = "sha256:908e7d64567cecd4c2b458075589a775063453aeb1d2a1853eedb806922f568b", size = 444840, upload-time = "2025-05-22T18:15:36.1Z" },
{ url = "https://files.pythonhosted.org/packages/55/a7/535c44c7bea4578e48281d83c615219f3ab19e6abc67625ef637c73987be/tornado-6.5.1-cp39-abi3-win_arm64.whl", hash = "sha256:02420a0eb7bf617257b9935e2b754d1b63897525d8a289c9d65690d580b4dcf7", size = 443596, upload-time = "2025-05-22T18:15:37.433Z" },
]
[[package]]
name = "typing-extensions"
version = "4.14.0"