From 3a34759094020a32eb6269a9bc29b2aedeb45dc9 Mon Sep 17 00:00:00 2001 From: Sam Date: Sat, 8 Nov 2025 19:17:40 -0600 Subject: [PATCH] Add core simulation components and configuration classes Major rewrite. --- README_ARCHITECTURE.md | 340 +++++++++++++++++++++ config/__init__.py | 19 ++ config/config_loader.py | 194 ++++++++++++ config/simulation_config.py | 109 +++++++ core/__init__.py | 21 ++ core/event_bus.py | 77 +++++ core/input_handler.py | 74 ++++- core/simulation_core.py | 310 +++++++++++++++++++ core/simulation_engine.py | 291 ++++++++++-------- core/timing.py | 186 ++++++++++++ engines/__init__.py | 5 + engines/headless_engine.py | 274 +++++++++++++++++ entry_points/__init__.py | 3 + entry_points/headless_main.py | 146 +++++++++ entry_points/interactive_main.py | 66 ++++ output/__init__.py | 17 ++ output/collectors/base_collector.py | 44 +++ output/collectors/entity_collector.py | 72 +++++ output/collectors/evolution_collector.py | 100 ++++++ output/collectors/metrics_collector.py | 28 ++ output/formatters/base_formatter.py | 18 ++ output/formatters/csv_formatter.py | 83 +++++ output/formatters/json_formatter.py | 20 ++ output/writers/base_writer.py | 23 ++ output/writers/file_writer.py | 140 +++++++++ ui/hud.py | 370 ++++++++++++++++++----- 26 files changed, 2830 insertions(+), 200 deletions(-) create mode 100644 README_ARCHITECTURE.md create mode 100644 config/__init__.py create mode 100644 config/config_loader.py create mode 100644 config/simulation_config.py create mode 100644 core/__init__.py create mode 100644 core/event_bus.py create mode 100644 core/simulation_core.py create mode 100644 core/timing.py create mode 100644 engines/__init__.py create mode 100644 engines/headless_engine.py create mode 100644 entry_points/__init__.py create mode 100644 entry_points/headless_main.py create mode 100644 entry_points/interactive_main.py create mode 100644 output/__init__.py create mode 100644 output/collectors/base_collector.py create mode 100644 output/collectors/entity_collector.py create mode 100644 output/collectors/evolution_collector.py create mode 100644 output/collectors/metrics_collector.py create mode 100644 output/formatters/base_formatter.py create mode 100644 output/formatters/csv_formatter.py create mode 100644 output/formatters/json_formatter.py create mode 100644 output/writers/base_writer.py create mode 100644 output/writers/file_writer.py diff --git a/README_ARCHITECTURE.md b/README_ARCHITECTURE.md new file mode 100644 index 0000000..662682c --- /dev/null +++ b/README_ARCHITECTURE.md @@ -0,0 +1,340 @@ +# Dynamic Abstraction System - Complete Architecture + +## Overview + +The Dynamic Abstraction System is a sophisticated 2D cellular simulation platform featuring autonomous entities with evolving neural networks. The system demonstrates headless-first architecture with clean separation of concerns, supporting both interactive development and production-scale batch processing. + +## Architecture Principles + +1. **Headless-First Design**: Core simulation logic runs independently of any UI dependencies +2. **Separation of Concerns**: Each component has a single, well-defined responsibility +3. **Loose Coupling**: Components communicate through events and interfaces rather than direct dependencies +4. **Configuration-Driven**: Behavior controlled through structured configuration files +5. **Extensible Data Collection**: Comprehensive metrics and evolution tracking capabilities +6. **Dual-Mode Support**: Both interactive debugging and headless batch processing + +## Complete Directory Structure + +``` +DynamicAbstractionSystem/ +├── main.py # Legacy entry point (interactive) +├── entry_points/ # Application entry points +│ ├── headless_main.py # Headless simulation entry +│ ├── interactive_main.py # Interactive simulation entry +│ └── __init__.py +├── core/ # Core simulation infrastructure +│ ├── simulation_core.py # Pure simulation logic (no UI deps) +│ ├── timing.py # TPS/timing management +│ ├── event_bus.py # Decoupled communication system +│ ├── simulation_engine.py # Interactive engine (UI wrapper) +│ ├── input_handler.py # Pure input processing +│ ├── renderer.py # World-to-screen rendering +│ └── __init__.py +├── engines/ # Specialized simulation engines +│ ├── headless_engine.py # Production headless engine +│ └── __init__.py +├── world/ # Simulation world and entities +│ ├── __init__.py +│ ├── world.py # Spatial partitioning system +│ ├── objects.py # Entity implementations +│ ├── base/ # Base entity systems +│ │ ├── brain.py # Neural network implementations +│ │ ├── neural.py # Flexible neural network +│ │ └── behavioral.py # Behavioral models +│ └── simulation_interface.py # Camera and spatial interfaces +├── ui/ # User interface components +│ ├── hud.py # Heads-up display and panels +│ └── __init__.py +├── config/ # Configuration management +│ ├── constants.py # Global constants +│ ├── simulation_config.py # Configuration classes +│ ├── config_loader.py # Configuration loading utilities +│ └── __init__.py +├── output/ # Data collection and output +│ ├── __init__.py +│ ├── collectors/ # Data collection modules +│ │ ├── base_collector.py # Collector interface +│ │ ├── metrics_collector.py # Basic metrics collection +│ │ ├── entity_collector.py # Entity state tracking +│ │ └── evolution_collector.py # Evolution tracking +│ ├── formatters/ # Output formatting +│ │ ├── base_formatter.py # Formatter interface +│ │ ├── json_formatter.py # JSON output +│ │ └── csv_formatter.py # CSV output +│ └── writers/ # Output destinations +│ ├── base_writer.py # Writer interface +│ └── file_writer.py # File system output +└── tests/ # Test suite + ├── __init__.py + └── [test files] +``` + +## Core System Architecture + +### 1. Simulation Layer + +#### SimulationCore (`core/simulation_core.py`) +- **Purpose**: Pure simulation logic with zero UI dependencies +- **Responsibilities**: + - World management and entity lifecycle + - Entity spawning and spatial queries + - Event-driven state management + - Timing coordination +- **Key Features**: + - Event-driven architecture via EventBus + - Configurable simulation parameters + - Complete UI isolation + - Spatial query methods (radius, range, nearest) + +#### TimingController (`core/timing.py`) +- **Purpose**: Precise timing management and simulation control +- **Responsibilities**: + - TPS (ticks per second) regulation + - Pause/sprint state management + - Speed multipliers + - Performance timing +- **Key Features**: + - Configurable target TPS + - Sprint mode for accelerated execution + - Pause/resume functionality + - Real-time performance metrics + +#### EventBus (`core/event_bus.py`) +- **Purpose**: Decoupled inter-component communication +- **Responsibilities**: + - Event routing and subscription management + - Type-safe event dispatch + - Event history tracking +- **Key Features**: + - Type-safe event system + - Event history for debugging + - Publisher-subscriber pattern + - Loose coupling between components + +### 2. Engine Layer + +#### SimulationEngine (`core/simulation_engine.py`) +- **Purpose**: Interactive simulation engine with UI integration +- **Responsibilities**: + - Main game loop coordination + - UI event processing + - Rendering pipeline management + - Component orchestration +- **Key Features**: + - Pygame integration + - Real-time input handling + - Rendering coordination + - UI state synchronization + +#### HeadlessSimulationEngine (`engines/headless_engine.py`) +- **Purpose**: Production-grade headless simulation execution +- **Responsibilities**: + - Batch simulation execution + - Data collection management + - Output generation + - Performance monitoring +- **Key Features**: + - No UI dependencies + - Maximum speed execution (2000+ TPS) + - Sprint mode integration for unlimited performance + - Precise TPS control when specified + - Comprehensive data collection + - Multiple output formats + - Configurable execution parameters + - Graceful shutdown handling + +### 3. World Layer + +#### World (`world/world.py`) +- **Purpose**: Spatial partitioning and entity management +- **Responsibilities**: + - Spatial indexing and partitioning + - Entity lifecycle management + - Collision detection support + - Spatial queries +- **Key Features**: + - Grid-based spatial partitioning + - Double buffering for performance + - Efficient spatial queries + - Entity add/remove operations + +#### Entity System (`world/objects.py`, `world/base/`) +- **Purpose**: Entity implementations and behavioral systems +- **Key Components**: + - `BaseEntity`: Abstract base class for all entities + - `DefaultCell`: Autonomous cellular entity with neural networks + - `FoodObject`: Energy source entities + - `CellBrain`: Neural network-based decision making + - `FlexibleNeuralNetwork`: Dynamic topology neural networks + +### 4. Data Collection System + +#### Collectors (`output/collectors/`) +- **BaseCollector**: Abstract base with tick-based collection logic +- **MetricsCollector**: Basic simulation metrics (TPS, entity counts, timing) +- **EntityCollector**: Detailed entity state snapshots +- **EvolutionCollector**: Neural network evolution and generational tracking +- **Tick-Based Intervals**: Uses simulation ticks for accurate collection timing +- **Every-Tick Option**: Configurable for maximum data resolution + +#### Formatters (`output/formatters/`) +- **JSONFormatter**: Human-readable JSON output +- **CSVFormatter**: Spreadsheet-compatible CSV output + +#### Writers (`output/writers/`) +- **FileWriter**: File system output with batch processing + +### 5. Configuration System + +#### Configuration Classes (`config/simulation_config.py`) +- **HeadlessConfig**: Headless simulation parameters +- **InteractiveConfig**: Interactive UI configuration +- **ExperimentConfig**: Automated experiment configuration +- **OutputConfig**: Data collection and output settings + +#### Configuration Loader (`config/config_loader.py`) +- **Purpose**: Dynamic configuration loading and validation +- **Features**: + - JSON/YAML support + - Configuration validation + - Sample configuration generation + - Lazy loading to avoid circular dependencies + +## Entry Points + +### Interactive Mode +```bash +# Standard interactive simulation +python entry_points/interactive_main.py + +# With custom configuration +python entry_points/interactive_main.py --config configs/interactive.json + +# Legacy compatibility +python main.py +``` + +### Headless Mode +```bash +# Basic headless execution (maximum speed by default) +python entry_points/headless_main.py + +# With custom parameters +python entry_points/headless_main.py --max-ticks 10000 --tps 60 --output-dir results + +# With data collection on every tick +python entry_points/headless_main.py --max-ticks 1000 --collect-every-tick + +# With configuration file +python entry_points/headless_main.py --config configs/experiment.json +``` + +## Input and Control System + +### Keyboard Controls +- **Space**: Toggle pause/play +- **Right Shift**: Toggle sprint mode +- **Left Shift**: Hold for temporary 2x speed boost +- **S**: Step forward one tick +- **R**: Reset camera position +- **G**: Toggle grid display +- **I**: Toggle interaction radius display +- **L**: Toggle legend display +- **WASD**: Move camera +- **Mouse wheel**: Zoom in/out +- **Middle mouse drag**: Pan camera +- **Left click/drag**: Select objects + +### Input Handler Architecture +- **Pure Input Processing**: No state management, only input mapping +- **Callback System**: All simulation control actions use callbacks +- **Event-Driven**: Decoupled from simulation state + +## Data Collection and Output + +### Comprehensive Metrics +- **Performance Metrics**: TPS, timing statistics, execution duration +- **Entity Statistics**: Population counts, energy distributions, age demographics +- **Evolution Data**: Neural network architectures, generational statistics, mutation tracking +- **World State**: Spatial distributions, resource availability + +### Output Formats +- **JSON**: Structured, human-readable, web-compatible +- **CSV**: Tabular format for spreadsheet analysis +- **File-based**: Batch output with configurable intervals + +## Performance Architecture + +### Spatial Optimization +- **Grid-based Spatial Partitioning**: Efficient entity queries +- **Double Buffering**: Prevents frame interference +- **Spatial Indexing**: Fast proximity and range queries + +### Timing Optimization +- **Configurable TPS**: Adjustable simulation speed +- **Maximum Speed Mode**: Unlimited execution speed for batch processing (2000+ TPS) +- **Sprint Mode Integration**: Automatic sprint mode for maximum performance +- **Precise Timing**: Frame-independent simulation updates +- **Accurate TPS Control**: Precise timing when TPS is specified + +### Memory Management +- **Entity Pooling**: Efficient memory allocation for entities +- **Event System**: Minimal memory overhead for inter-component communication +- **Lazy Loading**: Configuration and components loaded on demand + +## Testing Architecture + +### Test Coverage +- **Unit Tests**: Core component isolation testing +- **Integration Tests**: Component interaction testing +- **Performance Tests**: Benchmarking and profiling +- **End-to-End Tests**: Complete simulation workflow testing + +### Testing Tools +- **cProfile Integration**: Built-in performance profiling +- **pytest Framework**: Structured test execution +- **Mock Components**: Isolated component testing + +## Extension Points + +### Custom Collectors +- Extensible data collection system +- Plugin architecture for custom metrics +- Configurable collection intervals + +### Custom Formatters +- Pluggable output format system +- Custom serialization support +- Multiple output destinations + +### Custom Engines +- Specialized simulation engines +- Custom execution patterns +- Integration interfaces + +## Deployment Architecture + +### Production Deployment +- **Headless Execution**: Server deployment without display dependencies +- **Batch Processing**: Automated experiment execution +- **Configuration-Driven**: Environment-specific configurations +- **Data Pipeline**: Automated data collection and output + +### Development Environment +- **Interactive Debugging**: Real-time visualization and control +- **Hot Reloading**: Configuration changes without restart +- **Performance Monitoring**: Real-time TPS and performance metrics +- **Extensive Logging**: Detailed execution tracking + +## Architectural Benefits + +1. **Maintainability**: Clear separation of concerns and modular design +2. **Testability**: Isolated components enable comprehensive unit testing +3. **Scalability**: Headless mode supports large-scale batch processing +4. **Flexibility**: Configuration-driven behavior adaptation +5. **Performance**: Optimized spatial queries and timing management +6. **Extensibility**: Plugin architecture for custom functionality +7. **Reliability**: Event-driven architecture with error handling +8. **Portability**: Headless operation enables deployment across platforms + +This architecture provides a robust foundation for both interactive development and production-scale simulation experiments, with clean separation between simulation logic, user interface, and data collection systems. \ No newline at end of file diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..e159e2e --- /dev/null +++ b/config/__init__.py @@ -0,0 +1,19 @@ +"""Configuration system for simulation modes.""" + +from .simulation_config import ( + SimulationConfig, + HeadlessConfig, + InteractiveConfig, + ExperimentConfig, + OutputConfig +) +from .config_loader import ConfigLoader + +__all__ = [ + 'SimulationConfig', + 'HeadlessConfig', + 'InteractiveConfig', + 'ExperimentConfig', + 'OutputConfig', + 'ConfigLoader' +] \ No newline at end of file diff --git a/config/config_loader.py b/config/config_loader.py new file mode 100644 index 0000000..bc33253 --- /dev/null +++ b/config/config_loader.py @@ -0,0 +1,194 @@ +"""Configuration loading and management.""" + +import json +import yaml +from pathlib import Path +from typing import Dict, Any, Union +from dataclasses import asdict + +from .simulation_config import HeadlessConfig, InteractiveConfig, ExperimentConfig + + +class ConfigLoader: + """Loads configuration from files and creates config objects.""" + + @staticmethod + def load_headless_config(config_path: Union[str, Path]) -> HeadlessConfig: + """Load headless configuration from file.""" + config_path = Path(config_path) + + if not config_path.exists(): + raise FileNotFoundError(f"Config file not found: {config_path}") + + with open(config_path, 'r') as f: + if config_path.suffix.lower() == '.json': + data = json.load(f) + elif config_path.suffix.lower() in ['.yml', '.yaml']: + data = yaml.safe_load(f) + else: + raise ValueError(f"Unsupported config format: {config_path.suffix}") + + return ConfigLoader._dict_to_headless_config(data) + + @staticmethod + def load_interactive_config(config_path: Union[str, Path]) -> InteractiveConfig: + """Load interactive configuration from file.""" + config_path = Path(config_path) + + if not config_path.exists(): + # Return default config if file doesn't exist + return InteractiveConfig() + + with open(config_path, 'r') as f: + if config_path.suffix.lower() == '.json': + data = json.load(f) + elif config_path.suffix.lower() in ['.yml', '.yaml']: + data = yaml.safe_load(f) + else: + raise ValueError(f"Unsupported config format: {config_path.suffix}") + + return ConfigLoader._dict_to_interactive_config(data) + + @staticmethod + def load_experiment_config(config_path: Union[str, Path]) -> ExperimentConfig: + """Load experiment configuration from file.""" + config_path = Path(config_path) + + if not config_path.exists(): + raise FileNotFoundError(f"Config file not found: {config_path}") + + with open(config_path, 'r') as f: + if config_path.suffix.lower() == '.json': + data = json.load(f) + elif config_path.suffix.lower() in ['.yml', '.yaml']: + data = yaml.safe_load(f) + else: + raise ValueError(f"Unsupported config format: {config_path.suffix}") + + return ConfigLoader._dict_to_experiment_config(data) + + @staticmethod + def save_config(config: Union[HeadlessConfig, InteractiveConfig, ExperimentConfig], + config_path: Union[str, Path]): + """Save configuration to file.""" + config_path = Path(config_path) + config_path.parent.mkdir(parents=True, exist_ok=True) + + # Convert config to dictionary + if isinstance(config, HeadlessConfig): + data = asdict(config) + elif isinstance(config, InteractiveConfig): + data = asdict(config) + elif isinstance(config, ExperimentConfig): + data = asdict(config) + else: + raise ValueError(f"Unknown config type: {type(config)}") + + with open(config_path, 'w') as f: + if config_path.suffix.lower() == '.json': + json.dump(data, f, indent=2) + elif config_path.suffix.lower() in ['.yml', '.yaml']: + yaml.dump(data, f, default_flow_style=False) + else: + raise ValueError(f"Unsupported config format: {config_path.suffix}") + + @staticmethod + def _dict_to_headless_config(data: Dict[str, Any]) -> HeadlessConfig: + """Convert dictionary to HeadlessConfig.""" + # Extract output config if present + output_data = data.get('output', {}) + output_config = OutputConfig(**output_data) + + # Extract simulation config if present + sim_data = data.get('simulation', {}) + simulation_config = SimulationConfig(**sim_data) + + return HeadlessConfig( + max_ticks=data.get('max_ticks'), + max_duration=data.get('max_duration'), + output=output_config, + simulation=simulation_config + ) + + @staticmethod + def _dict_to_interactive_config(data: Dict[str, Any]) -> InteractiveConfig: + """Convert dictionary to InteractiveConfig.""" + # Extract simulation config if present + sim_data = data.get('simulation', {}) + simulation_config = SimulationConfig(**sim_data) + + return InteractiveConfig( + window_width=data.get('window_width', 0), + window_height=data.get('window_height', 0), + vsync=data.get('vsync', True), + resizable=data.get('resizable', True), + show_grid=data.get('show_grid', True), + show_interaction_radius=data.get('show_interaction_radius', False), + show_legend=data.get('show_legend', True), + control_bar_height=data.get('control_bar_height', 48), + inspector_width=data.get('inspector_width', 260), + properties_width=data.get('properties_width', 320), + console_height=data.get('console_height', 120), + simulation=simulation_config + ) + + @staticmethod + def _dict_to_experiment_config(data: Dict[str, Any]) -> ExperimentConfig: + """Convert dictionary to ExperimentConfig.""" + # Extract base config if present + base_data = data.get('base_config', {}) + base_config = ConfigLoader._dict_to_headless_config(base_data) + + return ExperimentConfig( + name=data.get('name', 'experiment'), + description=data.get('description', ''), + runs=data.get('runs', 1), + run_duration=data.get('run_duration'), + run_ticks=data.get('run_ticks'), + variables=data.get('variables', {}), + base_config=base_config, + aggregate_results=data.get('aggregate_results', True), + aggregate_format=data.get('aggregate_format', 'csv') + ) + + @staticmethod + def get_default_headless_config() -> HeadlessConfig: + """Get default headless configuration.""" + return HeadlessConfig() + + @staticmethod + def get_default_interactive_config() -> InteractiveConfig: + """Get default interactive configuration.""" + return InteractiveConfig() + + @staticmethod + def create_sample_configs(output_dir: str = "configs"): + """Create sample configuration files.""" + output_path = Path(output_dir) + output_path.mkdir(exist_ok=True) + + # Sample headless config + headless_config = ConfigLoader.get_default_headless_config() + ConfigLoader.save_config(headless_config, output_path / "headless_default.json") + + # Sample interactive config + interactive_config = ConfigLoader.get_default_interactive_config() + ConfigLoader.save_config(interactive_config, output_path / "interactive_default.json") + + # Sample experiment config + experiment_config = ExperimentConfig( + name="tps_sweep", + description="Test different TPS values", + runs=5, + run_duration=60.0, # 1 minute per run + variables={ + "simulation.default_tps": [10, 20, 40, 80, 160] + } + ) + ConfigLoader.save_config(experiment_config, output_path / "experiment_tps_sweep.json") + + print(f"Sample configs created in {output_path}") + + +# Import OutputConfig for the loader +from .simulation_config import OutputConfig, SimulationConfig \ No newline at end of file diff --git a/config/simulation_config.py b/config/simulation_config.py new file mode 100644 index 0000000..7352c71 --- /dev/null +++ b/config/simulation_config.py @@ -0,0 +1,109 @@ +"""Simulation configuration classes for different modes.""" + +from dataclasses import dataclass, field +from typing import List, Optional +from config.constants import * + + +@dataclass +class SimulationConfig: + """Configuration for simulation setup.""" + grid_width: int = GRID_WIDTH + grid_height: int = GRID_HEIGHT + cell_size: int = CELL_SIZE + initial_cells: int = 50 + initial_food: int = FOOD_OBJECTS_COUNT + food_spawning: bool = FOOD_SPAWNING + random_seed: int = RANDOM_SEED + default_tps: float = DEFAULT_TPS + + +@dataclass +class OutputConfig: + """Configuration for data output.""" + enabled: bool = True + directory: str = "simulation_output" + formats: List[str] = field(default_factory=lambda: ['json']) + collect_metrics: bool = True + collect_entities: bool = True + collect_evolution: bool = True + metrics_interval: int = 100 + entities_interval: int = 1000 + evolution_interval: int = 1000 + real_time: bool = False + + +@dataclass +class HeadlessConfig: + """Configuration for headless simulation mode.""" + # Simulation settings + max_ticks: Optional[int] = None + max_duration: Optional[float] = None # seconds + + # Output settings + output: OutputConfig = field(default_factory=OutputConfig) + + # Simulation core config + simulation: SimulationConfig = field(default_factory=lambda: SimulationConfig( + grid_width=GRID_WIDTH, + grid_height=GRID_HEIGHT, + cell_size=CELL_SIZE, + initial_cells=50, + initial_food=FOOD_OBJECTS_COUNT, + food_spawning=FOOD_SPAWNING, + random_seed=RANDOM_SEED, + default_tps=DEFAULT_TPS + )) + + +@dataclass +class InteractiveConfig: + """Configuration for interactive simulation mode with UI.""" + # Window settings + window_width: int = 0 # 0 = auto-detect + window_height: int = 0 # 0 = auto-detect + vsync: bool = True + resizable: bool = True + + # UI settings + show_grid: bool = True + show_interaction_radius: bool = False + show_legend: bool = True + control_bar_height: int = 48 + inspector_width: int = 260 + properties_width: int = 320 + console_height: int = 120 + + # Simulation core config + simulation: SimulationConfig = field(default_factory=lambda: SimulationConfig( + grid_width=GRID_WIDTH, + grid_height=GRID_HEIGHT, + cell_size=CELL_SIZE, + initial_cells=50, + initial_food=FOOD_OBJECTS_COUNT, + food_spawning=FOOD_SPAWNING, + random_seed=RANDOM_SEED, + default_tps=DEFAULT_TPS + )) + + +@dataclass +class ExperimentConfig: + """Configuration for automated experiments.""" + name: str = "experiment" + description: str = "" + + # Multiple runs + runs: int = 1 + run_duration: Optional[float] = None # seconds per run + run_ticks: Optional[int] = None # ticks per run + + # Variables to test (for parameter sweeps) + variables: dict = field(default_factory=dict) + + # Base configuration + base_config: HeadlessConfig = field(default_factory=HeadlessConfig) + + # Output aggregation + aggregate_results: bool = True + aggregate_format: str = "csv" \ No newline at end of file diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..0bb23f8 --- /dev/null +++ b/core/__init__.py @@ -0,0 +1,21 @@ +"""Core simulation infrastructure and engine components.""" + +from .simulation_core import SimulationCore, SimulationState +from .timing import TimingController, TimingState +from .event_bus import EventBus, EventType, Event +from .simulation_engine import SimulationEngine +from .input_handler import InputHandler +from .renderer import Renderer + +__all__ = [ + 'SimulationCore', + 'SimulationState', + 'TimingController', + 'TimingState', + 'EventBus', + 'EventType', + 'Event', + 'SimulationEngine', + 'InputHandler', + 'Renderer' +] \ No newline at end of file diff --git a/core/event_bus.py b/core/event_bus.py new file mode 100644 index 0000000..053d9e8 --- /dev/null +++ b/core/event_bus.py @@ -0,0 +1,77 @@ +"""Simple event bus for decoupled component communication.""" + +from typing import Dict, List, Callable, Any +from dataclasses import dataclass +from enum import Enum + + +class EventType(Enum): + """Types of simulation events.""" + SIMULATION_STATE_CHANGED = "simulation_state_changed" + WORLD_TICK_COMPLETED = "world_tick_completed" + ENTITY_ADDED = "entity_added" + ENTITY_REMOVED = "entity_removed" + SELECTION_CHANGED = "selection_changed" + TIMING_UPDATE = "timing_update" + + +@dataclass +class Event: + """Event data structure.""" + type: EventType + data: Dict[str, Any] + timestamp: float = None + + def __post_init__(self): + import time + if self.timestamp is None: + self.timestamp = time.time() + + +class EventBus: + """Simple event bus for decoupled communication.""" + + def __init__(self): + self._subscribers: Dict[EventType, List[Callable]] = {} + self._event_history: List[Event] = [] + self._max_history = 1000 + + def subscribe(self, event_type: EventType, callback: Callable[[Event], None]): + """Subscribe to an event type.""" + if event_type not in self._subscribers: + self._subscribers[event_type] = [] + self._subscribers[event_type].append(callback) + + def unsubscribe(self, event_type: EventType, callback: Callable[[Event], None]): + """Unsubscribe from an event type.""" + if event_type in self._subscribers: + try: + self._subscribers[event_type].remove(callback) + except ValueError: + pass + + def publish(self, event: Event): + """Publish an event to all subscribers.""" + # Store in history + self._event_history.append(event) + if len(self._event_history) > self._max_history: + self._event_history.pop(0) + + # Notify subscribers + if event.type in self._subscribers: + for callback in self._subscribers[event.type]: + try: + callback(event) + except Exception as e: + print(f"Error in event callback: {e}") + + def get_recent_events(self, event_type: EventType = None, count: int = 10) -> List[Event]: + """Get recent events, optionally filtered by type.""" + events = self._event_history + if event_type: + events = [e for e in events if e.type == event_type] + return events[-count:] + + def clear_history(self): + """Clear event history.""" + self._event_history.clear() \ No newline at end of file diff --git a/core/input_handler.py b/core/input_handler.py index 97e3586..ab4d857 100644 --- a/core/input_handler.py +++ b/core/input_handler.py @@ -1,35 +1,42 @@ # core/input_handler.py -"""Handles all input events and camera controls.""" +"""Handles input events and camera controls - no state management.""" import pygame from config.constants import * class InputHandler: + """Pure input handler - processes input without managing simulation state.""" + def __init__(self, camera, world, sim_view_rect): self.camera = camera self.world = world - # Selection state + # Selection state (input-specific, not simulation state) self.selecting = False self.select_start = None self.select_end = None self.selected_objects = [] - # UI state flags + # UI display flags (input-controlled visual settings) self.show_grid = True self.show_interaction_radius = False self.show_legend = False - self.is_paused = False - # Speed control + # Simulation state references (synchronized from external source) self.tps = DEFAULT_TPS self.default_tps = DEFAULT_TPS + self.is_paused = False self.sprint_mode = False + self.is_stepping = False + self.speed_multiplier = 1.0 # sim-view rect for mouse position calculations self.sim_view_rect = sim_view_rect + # Action callbacks for simulation control + self.action_callbacks = {} + def update_sim_view_rect(self, sim_view_rect): """Update the sim_view rectangle.""" self.sim_view_rect = sim_view_rect @@ -79,20 +86,24 @@ class InputHandler: elif event.key == pygame.K_l: self.show_legend = not self.show_legend elif event.key == pygame.K_SPACE: - self.is_paused = not self.is_paused + self.toggle_pause() elif event.key == pygame.K_LSHIFT: - self.tps = self.default_tps * TURBO_MULTIPLIER + # Left Shift for temporary speed boost (turbo mode) + self.set_speed_multiplier(2.0) elif event.key == pygame.K_r: self.camera.reset_position() elif event.key == pygame.K_RSHIFT: - self.sprint_mode = not self.sprint_mode # Enter sprint mode + self.toggle_sprint_mode() # Right Shift toggles sprint mode + elif event.key == pygame.K_s: + self.step_forward() # Step forward return running def _handle_keyup(self, event): """Handle keyup events.""" if event.key == pygame.K_LSHIFT: - self.tps = self.default_tps + # Reset speed multiplier when Left Shift is released + self.set_speed_multiplier(1.0) # if event.key == pygame.K_RSHIFT: # self.sprint_mode = False # Exit sprint mode @@ -181,4 +192,47 @@ class InputHandler: width = abs(self.select_end[0] - self.select_start[0]) height = abs(self.select_end[1] - self.select_start[1]) return (left, top, width, height) - return None \ No newline at end of file + return None + + def set_action_callback(self, action_name: str, callback): + """Set callback for simulation control actions.""" + self.action_callbacks[action_name] = callback + + def toggle_pause(self): + """Toggle pause state via callback.""" + if 'toggle_pause' in self.action_callbacks: + self.action_callbacks['toggle_pause']() + + def step_forward(self): + """Execute single simulation step via callback.""" + self.is_stepping = True + if 'step_forward' in self.action_callbacks: + self.action_callbacks['step_forward']() + + def set_speed_multiplier(self, multiplier): + """Set speed multiplier for simulation via callback.""" + if 'set_speed' in self.action_callbacks: + self.action_callbacks['set_speed'](multiplier) + + def set_custom_tps(self, tps): + """Set custom TPS value via callback.""" + if 'set_custom_tps' in self.action_callbacks: + self.action_callbacks['set_custom_tps'](tps) + + def toggle_sprint_mode(self): + """Toggle sprint mode via callback.""" + if 'toggle_sprint' in self.action_callbacks: + self.action_callbacks['toggle_sprint']() + + def get_current_speed_display(self): + """Get current speed display string.""" + if self.sprint_mode: + return "Sprint" + elif self.is_paused: + return "Paused" + elif self.speed_multiplier == 1.0: + return "1x" + elif self.speed_multiplier in [0.5, 2.0, 4.0, 8.0]: + return f"{self.speed_multiplier}x" + else: + return f"{self.speed_multiplier:.1f}x" \ No newline at end of file diff --git a/core/simulation_core.py b/core/simulation_core.py new file mode 100644 index 0000000..b282bad --- /dev/null +++ b/core/simulation_core.py @@ -0,0 +1,310 @@ +"""Pure simulation core with no UI dependencies.""" + +import time +import random +from typing import List, Optional, Dict, Any +from dataclasses import dataclass + +from world.world import World, Position, Rotation +from world.simulation_interface import Camera +from .event_bus import EventBus, EventType, Event +from .timing import TimingController +from config.constants import * +from config.simulation_config import SimulationConfig + + +@dataclass +class SimulationState: + """Current simulation state.""" + total_ticks: int = 0 + actual_tps: float = 0.0 + is_running: bool = False + world_buffer: int = 0 + + +class SimulationCore: + """Pure simulation logic with no UI dependencies.""" + + def __init__(self, config: SimulationConfig, event_bus: Optional[EventBus] = None): + self.config = config + self.event_bus = event_bus or EventBus() + + # Initialize timing controller + self.timing = TimingController(config.default_tps, self.event_bus) + + # Initialize world + self.world = self._setup_world() + + # Simulation state + self.state = SimulationState() + self.is_running = False + + # Camera (for spatial calculations, not rendering) + self.camera = Camera( + config.grid_width * config.cell_size, + config.grid_height * config.cell_size, + RENDER_BUFFER + ) + + def _setup_world(self) -> World: + """Setup the simulation world.""" + # Import locally to avoid circular imports + from world.objects import DefaultCell, FoodObject + + world = World( + self.config.cell_size, + ( + self.config.cell_size * self.config.grid_width, + self.config.cell_size * self.config.grid_height + ) + ) + + # Set random seed for reproducibility + random.seed(self.config.random_seed) + + # Calculate world bounds + half_width = self.config.grid_width * self.config.cell_size // 2 + half_height = self.config.grid_height * self.config.cell_size // 2 + + # Add initial food + if self.config.food_spawning: + for _ in range(self.config.initial_food): + x = random.randint(-half_width // 2, half_width // 2) + y = random.randint(-half_height // 2, half_height // 2) + food = FoodObject(Position(x=x, y=y)) + world.add_object(food) + self._notify_entity_added(food) + + # Add initial cells + for _ in range(self.config.initial_cells): + cell = DefaultCell( + Position( + x=random.randint(-half_width // 2, half_width // 2), + y=random.randint(-half_height // 2, half_height // 2) + ), + Rotation(angle=0) + ) + # Mutate the initial behavioral model for variety + cell.behavioral_model = cell.behavioral_model.mutate(3) + world.add_object(cell) + self._notify_entity_added(cell) + + return world + + def start(self): + """Start the simulation.""" + self.is_running = True + self.state.is_running = True + self.timing.reset_counters() + + def stop(self): + """Stop the simulation.""" + self.is_running = False + self.state.is_running = False + + def pause(self): + """Pause the simulation.""" + self.timing.set_pause(True) + + def resume(self): + """Resume the simulation.""" + self.timing.set_pause(False) + + def step(self): + """Execute a single simulation step.""" + was_paused = self.timing.state.is_paused + self.timing.set_pause(False) + self._tick() + self.timing.set_pause(was_paused) + + def update(self, delta_time: float): + """Update simulation timing and tick if needed.""" + if not self.is_running: + return + + # Handle sprint mode + if self.timing.state.sprint_mode: + self._handle_sprint_mode() + return + + # Handle single step + if hasattr(self, '_stepping') and self._stepping: + self._tick() + self._stepping = False + return + + # Normal timing-based ticks + if self.timing.should_tick(): + self._tick() + + # Update timing + current_time = time.perf_counter() + if current_time - self.timing.last_tps_time >= 1.0: + self.state.actual_tps = self.timing.actual_tps + self.state.total_ticks = self.timing.total_ticks + + def _tick(self): + """Execute a single simulation tick.""" + # Update world + self.world.tick_all() + + # Update timing + self.timing.update_timing() + + # Update state + self.state.total_ticks = self.timing.total_ticks + self.state.actual_tps = self.timing.actual_tps + self.state.world_buffer = self.world.current_buffer + + # Notify subscribers + self._notify_world_tick_completed() + + def _handle_sprint_mode(self): + """Handle sprint mode execution.""" + if not self.timing.state.sprint_mode: + return + + start_time = time.perf_counter() + + # Execute multiple ticks in sprint mode + while time.perf_counter() - start_time < 0.05: # 50ms of sprint + self._tick() + + def set_tps(self, tps: float): + """Set target TPS.""" + self.timing.set_tps(tps) + + def set_speed_multiplier(self, multiplier: float): + """Set speed multiplier.""" + self.timing.set_speed_multiplier(multiplier) + + def toggle_pause(self): + """Toggle pause state.""" + self.timing.toggle_pause() + + def toggle_sprint_mode(self): + """Toggle sprint mode.""" + self.timing.toggle_sprint_mode() + + def get_entities_in_radius(self, position: Position, radius: float) -> List: + """Get entities within radius of position.""" + return self.world.query_objects_within_radius(position.x, position.y, radius) + + def get_entities_in_range(self, min_pos: Position, max_pos: Position) -> List: + """Get entities within rectangular range.""" + return self.world.query_objects_in_range(min_pos.x, min_pos.y, max_pos.x, max_pos.y) + + def get_closest_entity(self, position: Position, max_distance: float = None) -> Optional: + """Get closest entity to position.""" + if max_distance is not None: + # Filter by distance after getting closest + closest = self.world.query_closest_object(position.x, position.y) + if closest: + dx = closest.position.x - position.x + dy = closest.position.y - position.y + distance = (dx * dx + dy * dy) ** 0.5 + if distance <= max_distance: + return closest + return None + else: + return self.world.query_closest_object(position.x, position.y) + + def count_entities_by_type(self, entity_type) -> int: + """Count entities of specific type.""" + count = 0 + for entity in self.world.get_objects(): + if isinstance(entity, entity_type): + count += 1 + return count + + def get_world_state(self) -> Dict[str, Any]: + """Get current world state for data collection.""" + # Import locally to avoid circular imports + from world.objects import DefaultCell, FoodObject + + return { + 'tick_count': self.state.total_ticks, + 'actual_tps': self.state.actual_tps, + 'world_buffer': self.state.world_buffer, + 'is_paused': self.timing.state.is_paused, + 'sprint_mode': self.timing.state.sprint_mode, + 'target_tps': self.timing.state.tps, + 'speed_multiplier': self.timing.state.speed_multiplier, + 'entity_counts': { + 'total': len(list(self.world.get_objects())), + 'cells': self.count_entities_by_type(DefaultCell), + 'food': self.count_entities_by_type(FoodObject) + } + } + + def get_entity_states(self) -> List[Dict[str, Any]]: + """Get states of all entities for data collection.""" + # Import locally to avoid circular imports + from world.objects import DefaultCell, FoodObject + + entities = [] + for entity in self.world.get_objects(): + if isinstance(entity, DefaultCell): + # Get neural network info safely + nn = entity.behavioral_model.neural_network if hasattr(entity.behavioral_model, 'neural_network') else None + layer_sizes = [len(nn.layers)] if nn and hasattr(nn, 'layers') else [4, 6, 2] # Default estimate + + entities.append({ + 'id': id(entity), + 'type': 'cell', + 'position': {'x': entity.position.x, 'y': entity.position.y}, + 'rotation': entity.rotation.angle, + 'energy': entity.energy, + 'age': entity.tick_count, # tick_count represents age + 'generation': getattr(entity, 'generation', 0), # Default to 0 if not present + 'neural_network': { + 'layer_sizes': layer_sizes, + 'has_neural_network': nn is not None + } + }) + elif isinstance(entity, FoodObject): + entities.append({ + 'id': id(entity), + 'type': 'food', + 'position': {'x': entity.position.x, 'y': entity.position.y}, + 'decay': entity.decay, + 'max_decay': entity.max_decay + }) + return entities + + def _notify_world_tick_completed(self): + """Notify subscribers that world tick completed.""" + if self.event_bus: + event = Event( + type=EventType.WORLD_TICK_COMPLETED, + data={ + 'tick_count': self.state.total_ticks, + 'world_state': self.get_world_state() + } + ) + self.event_bus.publish(event) + + def _notify_entity_added(self, entity): + """Notify subscribers of entity addition.""" + if self.event_bus: + event = Event( + type=EventType.ENTITY_ADDED, + data={ + 'entity_id': id(entity), + 'entity_type': type(entity).__name__, + 'position': {'x': entity.position.x, 'y': entity.position.y} + } + ) + self.event_bus.publish(event) + + def _notify_entity_removed(self, entity): + """Notify subscribers of entity removal.""" + if self.event_bus: + event = Event( + type=EventType.ENTITY_REMOVED, + data={ + 'entity_id': id(entity), + 'entity_type': type(entity).__name__ + } + ) + self.event_bus.publish(event) \ No newline at end of file diff --git a/core/simulation_engine.py b/core/simulation_engine.py index 1326120..b01f09a 100644 --- a/core/simulation_engine.py +++ b/core/simulation_engine.py @@ -1,17 +1,15 @@ import pygame import time -import random import sys from pygame_gui import UIManager -from world.base.brain import CellBrain, FlexibleNeuralNetwork -from world.world import World, Position, Rotation -from world.objects import FoodObject, DefaultCell from world.simulation_interface import Camera from config.constants import * from core.input_handler import InputHandler from core.renderer import Renderer +from core.simulation_core import SimulationCore, SimulationConfig +from core.event_bus import EventBus from ui.hud import HUD import cProfile @@ -19,17 +17,21 @@ import pstats class SimulationEngine: + """Interactive simulation engine with UI (wrapper around SimulationCore).""" + def __init__(self): pygame.init() + self.event_bus = EventBus() self._init_window() - self._init_ui() self._init_simulation() + self._init_ui() self.running = True def _profile_single_tick(self): + """Profile a single tick for performance analysis.""" profiler = cProfile.Profile() profiler.enable() - self.world.tick_all() + self.simulation_core.world.tick_all() profiler.disable() profiler.dump_stats('profile_tick.prof') # Save to file @@ -51,17 +53,47 @@ class SimulationEngine: self._update_simulation_view() def _init_simulation(self): - self.last_tick_time = time.perf_counter() - self.last_tps_time = time.perf_counter() - self.tick_counter = 0 - self.actual_tps = 0 - self.total_ticks = 0 + # Initialize default sim view rect (will be updated by _init_ui) + self.sim_view_width = self.window_width - 400 # Rough estimate for inspector width + self.sim_view_height = self.window_height - 200 # Rough estimate for control bar height + self.sim_view = pygame.Surface((self.sim_view_width, self.sim_view_height)) + self.sim_view_rect = self.sim_view.get_rect(topleft=(200, 48)) # Rough estimate - self.world = self._setup_world() - self.input_handler = InputHandler(self.camera, self.world, self.sim_view_rect) + # Create simulation core + sim_config = SimulationConfig( + grid_width=GRID_WIDTH, + grid_height=GRID_HEIGHT, + cell_size=CELL_SIZE, + initial_cells=50, + initial_food=FOOD_OBJECTS_COUNT, + food_spawning=FOOD_SPAWNING, + random_seed=RANDOM_SEED, + default_tps=DEFAULT_TPS + ) + + self.simulation_core = SimulationCore(sim_config, self.event_bus) + + # Setup input handler with simulation core world + self.input_handler = InputHandler( + self.simulation_core.camera, + self.simulation_core.world, + self.sim_view_rect + ) + self.input_handler.tps = self.simulation_core.timing.state.tps + self.input_handler.default_tps = DEFAULT_TPS + + # Set up action callbacks for input handler + self.input_handler.set_action_callback('toggle_pause', self.simulation_core.toggle_pause) + self.input_handler.set_action_callback('step_forward', self.simulation_core.step) + self.input_handler.set_action_callback('set_speed', self.simulation_core.set_speed_multiplier) + self.input_handler.set_action_callback('set_custom_tps', self.simulation_core.set_tps) + self.input_handler.set_action_callback('toggle_sprint', self.simulation_core.toggle_sprint_mode) + + # Setup renderer self.renderer = Renderer(self.sim_view) - self._profile_single_tick() # Profile a single tick for performance analysis + # Profile a single tick for performance analysis + self._profile_single_tick() def _update_simulation_view(self): viewport_rect = self.hud.get_viewport_rect() @@ -73,170 +105,189 @@ class SimulationEngine: self.ui_manager.set_window_resolution((self.window_width, self.window_height)) self.renderer = Renderer(self.sim_view) - if hasattr(self, 'camera'): - self.camera.screen_width = self.sim_view_width - self.camera.screen_height = self.sim_view_height + # Update simulation core camera dimensions + self.simulation_core.camera.screen_width = self.sim_view_width + self.simulation_core.camera.screen_height = self.sim_view_height - if hasattr(self, 'input_handler'): - self.input_handler.update_sim_view_rect(self.sim_view_rect) - - if not hasattr(self, 'camera'): - self.camera = Camera(self.sim_view_width, self.sim_view_height, RENDER_BUFFER) - - @staticmethod - def _setup_world(): - world = World(CELL_SIZE, (CELL_SIZE * GRID_WIDTH, CELL_SIZE * GRID_HEIGHT)) - random.seed(RANDOM_SEED) - - half_width = GRID_WIDTH * CELL_SIZE // 2 - half_height = GRID_HEIGHT * CELL_SIZE // 2 - - if FOOD_SPAWNING: - for _ in range(FOOD_OBJECTS_COUNT): - x = random.randint(-half_width // 2, half_width // 2) - y = random.randint(-half_height // 2, half_height // 2) - world.add_object(FoodObject(Position(x=x, y=y))) - - for _ in range(50): - new_cell = DefaultCell( - Position(x=random.randint(-half_width // 2, half_width // 2), y=random.randint(-half_height // 2, half_height // 2)), - Rotation(angle=0) - ) - new_cell.behavioral_model = new_cell.behavioral_model.mutate(3) - world.add_object(new_cell) - - return world + # Update input handler simulation view rect + self.input_handler.update_sim_view_rect(self.sim_view_rect) def _count_cells(self): - count = 0 - for entity in self.world.get_objects(): - if isinstance(entity, DefaultCell): - count += 1 - return count + """Count cells in the simulation.""" + # Import locally to avoid circular import + from world.objects import DefaultCell + return self.simulation_core.count_entities_by_type(DefaultCell) def run(self): - print(self.world.current_buffer) + """Run the interactive simulation engine.""" + print(f"World buffer: {self.simulation_core.world.current_buffer}") + self.simulation_core.start() + while self.running: self._handle_frame() + + self.simulation_core.stop() pygame.quit() sys.exit() def _handle_frame(self): + """Handle a single frame in the interactive simulation.""" deltatime = self.clock.get_time() / 1000.0 - tick_interval = 1.0 / self.input_handler.tps + # Handle events events = pygame.event.get() self.running = self.input_handler.handle_events(events, self.hud.manager) - self._handle_window_events(events) + # Process HUD events and window events + for event in events: + hud_action = self.hud.process_event(event) + self._handle_hud_actions(hud_action) + + if event.type == pygame.VIDEORESIZE: + self._handle_window_resize(event) + + # Sync input handler state with simulation core timing + self._sync_input_and_timing() + + # Handle sprint mode if self.input_handler.sprint_mode: self._handle_sprint_mode() return - # Only process one tick per frame if enough time has passed - if not self.input_handler.is_paused: - current_time = time.perf_counter() - if current_time - self.last_tick_time >= tick_interval: - self.last_tick_time += tick_interval - self.tick_counter += 1 - self.total_ticks += 1 - self.input_handler.update_selected_objects() - self.world.tick_all() - self.hud.manager.update(deltatime) - if current_time - self.last_tps_time >= 1.0: - self.actual_tps = self.tick_counter - self.tick_counter = 0 - self.last_tps_time += 1.0 - else: - self.last_tick_time = time.perf_counter() - self.last_tps_time = time.perf_counter() + # Update UI manager every frame + self.hud.manager.update(deltatime) - self.hud.manager.draw_ui(self.screen) + # Handle step-forward mode + if self.input_handler.is_stepping: + self.simulation_core.step() + self.input_handler.is_stepping = False + else: + # Update simulation using core + self.simulation_core.update(deltatime) + + # Update selected objects in input handler + self.input_handler.update_selected_objects() + + # Render frame self._update(deltatime) self._render() - def _handle_window_events(self, events): - for event in events: - self.hud.process_event(event) - if event.type == pygame.VIDEORESIZE: - self.window_width, self.window_height = event.w, event.h - self.screen = pygame.display.set_mode( - (self.window_width, self.window_height), - pygame.RESIZABLE - ) - self._update_simulation_view() - self.hud.update_layout(self.window_width, self.window_height) + def _sync_input_and_timing(self): + """Synchronize input handler state with simulation core timing.""" + timing_state = self.simulation_core.timing.state - self.hud.update_layout(self.window_width, self.window_height) + # Sync TPS + self.input_handler.tps = timing_state.tps + + # Sync pause state + self.input_handler.is_paused = timing_state.is_paused + + # Sync sprint mode + self.input_handler.sprint_mode = timing_state.sprint_mode + + # Sync speed multiplier + self.input_handler.speed_multiplier = timing_state.speed_multiplier + + def _handle_window_resize(self, event): + """Handle window resize event.""" + self.window_width, self.window_height = event.w, event.h + self.screen = pygame.display.set_mode( + (self.window_width, self.window_height), + pygame.RESIZABLE + ) self._update_simulation_view() + self.hud.update_layout(self.window_width, self.window_height) + + + def _handle_hud_actions(self, action): + """Handle actions from HUD simulation controls by forwarding to simulation core.""" + if action == 'toggle_pause': + self.simulation_core.toggle_pause() + elif action == 'step_forward': + self.simulation_core.step() + elif action == 'toggle_sprint': + self.simulation_core.toggle_sprint_mode() + elif isinstance(action, tuple) and action[0] == 'set_speed': + self.simulation_core.set_speed_multiplier(action[1]) + elif isinstance(action, tuple) and action[0] == 'set_custom_tps': + self.simulation_core.set_tps(action[1]) + elif isinstance(action, tuple) and action[0] == 'reset_tps_display': + # Reset TPS display to current value + if self.hud.custom_tps_entry: + self.hud.custom_tps_entry.set_text(str(int(self.simulation_core.timing.state.tps))) def _handle_sprint_mode(self): + """Handle sprint mode by running multiple simulation ticks quickly.""" current_time = time.perf_counter() - while True: + while time.perf_counter() - current_time < 0.05: # 50ms of sprint + self.simulation_core.update(0.016) # Update simulation self.input_handler.update_selected_objects() - self.world.tick_all() - self.tick_counter += 1 - self.total_ticks += 1 pygame.event.pump() # Prevent event queue overflow - if time.perf_counter() - current_time > 0.05: - break - if time.perf_counter() - self.last_tps_time >= 1.0: - self.actual_tps = self.tick_counter - self.tick_counter = 0 - self.last_tps_time = time.perf_counter() + + # Render sprint debug info self.screen.fill(BLACK) self.renderer.clear_screen() cell_count = self._count_cells() - self.hud.render_sprint_debug(self.screen, self.actual_tps, self.total_ticks, cell_count) + self.hud.render_sprint_debug( + self.screen, + self.simulation_core.state.actual_tps, + self.simulation_core.state.total_ticks, + cell_count + ) pygame.display.flip() self.clock.tick(MAX_FPS) - self.last_tick_time = time.perf_counter() - - def _handle_simulation_ticks(self, tick_interval, deltatime): - current_time = time.perf_counter() - while current_time - self.last_tick_time >= tick_interval: - self.last_tick_time += tick_interval - self.tick_counter += 1 - self.total_ticks += 1 - self.input_handler.update_selected_objects() - self.world.tick_all() - self.hud.manager.update(deltatime) - if current_time - self.last_tps_time >= 1.0: - self.actual_tps = self.tick_counter - self.tick_counter = 0 - self.last_tps_time += 1.0 def _update(self, deltatime): + """Update camera based on input.""" keys = pygame.key.get_pressed() self.input_handler.update_camera(keys, deltatime) def _render(self): + """Render the simulation frame.""" self.screen.fill(BLACK) self.renderer.clear_screen() if not self.hud.dragging_splitter: - self.renderer.draw_grid(self.camera, self.input_handler.show_grid) - self.renderer.render_world(self.world, self.camera) + # Render world + self.renderer.draw_grid(self.simulation_core.camera, self.input_handler.show_grid) + self.renderer.render_world(self.simulation_core.world, self.simulation_core.camera) self.renderer.render_interaction_radius( - self.world, self.camera, self.input_handler.selected_objects, self.input_handler.show_interaction_radius + self.simulation_core.world, + self.simulation_core.camera, + self.input_handler.selected_objects, + self.input_handler.show_interaction_radius + ) + self.renderer.render_selection_rectangle( + self.input_handler.get_selection_rect(), + self.sim_view_rect + ) + self.renderer.render_selected_objects_outline( + self.input_handler.selected_objects, + self.simulation_core.camera ) - self.renderer.render_selection_rectangle(self.input_handler.get_selection_rect(), self.sim_view_rect) - self.renderer.render_selected_objects_outline(self.input_handler.selected_objects, self.camera) self.screen.blit(self.sim_view, (self.sim_view_rect.left, self.sim_view_rect.top)) + # Update HUD displays with simulation core state + self.hud.update_simulation_controls(self.simulation_core) + + # Draw UI elements self.hud.manager.draw_ui(self.screen) self.hud.draw_splitters(self.screen) - # self.hud.render_mouse_position(self.screen, self.camera, self.sim_view_rect) + # Render HUD overlays self.hud.render_fps(self.screen, self.clock) - self.hud.render_tps(self.screen, self.actual_tps) - # self.hud.render_tick_count(self.screen, self.total_ticks) + self.hud.render_tps(self.screen, self.simulation_core.state.actual_tps) self.hud.render_selected_objects_info(self.screen, self.input_handler.selected_objects) self.hud.render_legend(self.screen, self.input_handler.show_legend) - self.hud.render_pause_indicator(self.screen, self.input_handler.is_paused) + self.hud.render_pause_indicator(self.screen, self.simulation_core.timing.state.is_paused) + + # Render neural network visualization for selected object if self.input_handler.selected_objects: - self.hud.render_neural_network_visualization(self.screen, self.input_handler.selected_objects[0]) + self.hud.render_neural_network_visualization( + self.screen, + self.input_handler.selected_objects[0] + ) pygame.display.flip() self.clock.tick(MAX_FPS) \ No newline at end of file diff --git a/core/timing.py b/core/timing.py new file mode 100644 index 0000000..d901603 --- /dev/null +++ b/core/timing.py @@ -0,0 +1,186 @@ +"""Timing and TPS management for simulation.""" + +import time +from dataclasses import dataclass +from typing import Optional +from .event_bus import EventBus, EventType, Event + + +@dataclass +class TimingState: + """Current timing state.""" + tps: float + is_paused: bool = False + sprint_mode: bool = False + speed_multiplier: float = 1.0 + + def __post_init__(self): + if self.speed_multiplier == 0: + self.speed_multiplier = 1.0 + + +class TimingController: + """Manages simulation timing, TPS, and pause states.""" + + def __init__(self, default_tps: float = 40.0, event_bus: Optional[EventBus] = None): + self.default_tps = default_tps + self.state = TimingState(tps=default_tps) + self.event_bus = event_bus + + # Timing variables + self.last_tick_time = time.perf_counter() + self.last_tps_time = time.perf_counter() + self.tick_counter = 0 + self.total_ticks = 0 + self.actual_tps = 0 + + # Sprint mode specific + self.sprint_start_time = None + + def set_tps(self, tps: float): + """Set target TPS.""" + if tps > 0: + self.state.tps = max(1.0, min(1000.0, tps)) + self.state.speed_multiplier = self.state.tps / self.default_tps + self._notify_state_change() + + def set_speed_multiplier(self, multiplier: float): + """Set speed multiplier for simulation.""" + if multiplier > 0: + self.state.speed_multiplier = max(0.1, min(10.0, multiplier)) + self.state.tps = self.default_tps * self.state.speed_multiplier + self._notify_state_change() + + def toggle_pause(self): + """Toggle pause state.""" + self.state.is_paused = not self.state.is_paused + if self.state.is_paused: + # Reset timing when paused + self.last_tick_time = time.perf_counter() + self.last_tps_time = time.perf_counter() + self._notify_state_change() + + def set_pause(self, paused: bool): + """Set pause state directly.""" + if self.state.is_paused != paused: + self.toggle_pause() + + def toggle_sprint_mode(self): + """Toggle sprint mode.""" + self.state.sprint_mode = not self.state.sprint_mode + if self.state.sprint_mode: + self.sprint_start_time = time.perf_counter() + else: + self.sprint_start_time = None + self._notify_state_change() + + def set_sprint_mode(self, enabled: bool): + """Set sprint mode directly.""" + if self.state.sprint_mode != enabled: + self.toggle_sprint_mode() + + def should_tick(self) -> bool: + """Check if simulation should tick based on timing.""" + if self.state.is_paused: + return False + + if self.state.sprint_mode: + return True + + tick_interval = 1.0 / self.state.tps + current_time = time.perf_counter() + return current_time - self.last_tick_time >= tick_interval + + def update_timing(self): + """Update timing variables after a tick.""" + current_time = time.perf_counter() + + # Update tick counters first + self.tick_counter += 1 + self.total_ticks += 1 + + if self.state.sprint_mode and self.sprint_start_time: + # In sprint mode, calculate TPS based on total sprint duration + sprint_duration = current_time - self.sprint_start_time + if sprint_duration > 0: + self.actual_tps = self.tick_counter / sprint_duration + else: + self.actual_tps = 0 + else: + # Normal mode TPS calculation using sliding window for more responsive display + elapsed_time = current_time - self.last_tps_time + + # Update TPS more frequently for responsive display + if elapsed_time >= 0.5: # Update every 500ms instead of 1 second + # Calculate actual TPS based on elapsed time + self.actual_tps = self.tick_counter / elapsed_time + + # Reset for next measurement period + self.last_tps_time = current_time + self.tick_counter = 0 + + # Update last tick time for next tick calculation - advance by tick interval + if not self.state.sprint_mode: + tick_interval = 1.0 / self.state.tps + self.last_tick_time += tick_interval + + self._notify_timing_update() + + def get_display_tps(self) -> int: + """Get TPS rounded to nearest whole number for display.""" + return round(self.actual_tps) + + def reset_counters(self): + """Reset timing counters.""" + self.tick_counter = 0 + self.total_ticks = 0 + self.actual_tps = 0 + self.last_tick_time = time.perf_counter() + self.last_tps_time = time.perf_counter() + if self.state.sprint_mode: + self.sprint_start_time = time.perf_counter() + + def get_tick_interval(self) -> float: + """Get current tick interval in seconds.""" + return 1.0 / self.state.tps if self.state.tps > 0 else 1.0 + + def get_current_speed_display(self) -> str: + """Get current speed display string.""" + if self.state.sprint_mode: + return "Sprint" + elif self.state.is_paused: + return "Paused" + elif self.state.speed_multiplier == 1.0: + return "1x" + elif self.state.speed_multiplier in [0.5, 2.0, 4.0, 8.0]: + return f"{self.state.speed_multiplier}x" + else: + return f"{self.state.speed_multiplier:.1f}x" + + def _notify_state_change(self): + """Notify subscribers of state change.""" + if self.event_bus: + event = Event( + type=EventType.SIMULATION_STATE_CHANGED, + data={ + 'timing_state': self.state, + 'tps': self.state.tps, + 'is_paused': self.state.is_paused, + 'sprint_mode': self.state.sprint_mode, + 'speed_multiplier': self.state.speed_multiplier + } + ) + self.event_bus.publish(event) + + def _notify_timing_update(self): + """Notify subscribers of timing update.""" + if self.event_bus: + event = Event( + type=EventType.TIMING_UPDATE, + data={ + 'actual_tps': self.actual_tps, + 'total_ticks': self.total_ticks, + 'tick_counter': self.tick_counter + } + ) + self.event_bus.publish(event) \ No newline at end of file diff --git a/engines/__init__.py b/engines/__init__.py new file mode 100644 index 0000000..9d92b86 --- /dev/null +++ b/engines/__init__.py @@ -0,0 +1,5 @@ +"""Simulation engines for different modes.""" + +from .headless_engine import HeadlessSimulationEngine, HeadlessConfig + +__all__ = ['HeadlessSimulationEngine', 'HeadlessConfig'] \ No newline at end of file diff --git a/engines/headless_engine.py b/engines/headless_engine.py new file mode 100644 index 0000000..b7da449 --- /dev/null +++ b/engines/headless_engine.py @@ -0,0 +1,274 @@ +"""Headless simulation engine for running simulations without UI.""" + +import time +import signal +import sys +from typing import Dict, Any, Optional, List +from dataclasses import dataclass + +from core.simulation_core import SimulationCore, SimulationConfig +from core.event_bus import EventBus +from output import MetricsCollector, EntityCollector, EvolutionCollector +from output.formatters.json_formatter import JSONFormatter +from output.formatters.csv_formatter import CSVFormatter +from output.writers.file_writer import FileWriter + + +@dataclass +class HeadlessConfig: + """Configuration for headless simulation engine.""" + simulation: SimulationConfig + max_ticks: Optional[int] = None + max_duration: Optional[float] = None # seconds + output_dir: str = "simulation_output" + enable_metrics: bool = True + enable_entities: bool = True + enable_evolution: bool = True + metrics_interval: int = 100 + entities_interval: int = 1000 + evolution_interval: int = 1000 + output_formats: List[str] = None # ['json', 'csv'] + real_time: bool = False # Whether to run in real-time or as fast as possible + + def __post_init__(self): + if self.output_formats is None: + self.output_formats = ['json'] + + +class HeadlessSimulationEngine: + """Headless simulation engine with data collection capabilities.""" + + def __init__(self, config: HeadlessConfig): + self.config = config + self.event_bus = EventBus() + self.simulation_core = SimulationCore(config.simulation, self.event_bus) + self.file_writer = FileWriter(config.output_dir) + self.formatters = self._create_formatters() + self.collectors = self._create_collectors() + + # Runtime state + self.running = False + self.start_time = None + self.tick_data = {} + self.batch_data = { + 'metrics': [], + 'entities': [], + 'evolution': [] + } + + # Setup signal handlers for graceful shutdown + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + def _create_formatters(self) -> Dict[str, Any]: + """Create data formatters.""" + formatters = {} + if 'json' in self.config.output_formats: + formatters['json'] = JSONFormatter() + if 'csv' in self.config.output_formats: + formatters['csv'] = CSVFormatter() + return formatters + + def _create_collectors(self) -> Dict[str, Any]: + """Create data collectors.""" + collectors = {} + + if self.config.enable_metrics: + collectors['metrics'] = MetricsCollector(self.config.metrics_interval) + + if self.config.enable_entities: + collectors['entities'] = EntityCollector( + self.config.entities_interval, + include_cells=True, + include_food=False + ) + + if self.config.enable_evolution: + collectors['evolution'] = EvolutionCollector(self.config.evolution_interval) + + return collectors + + def run(self) -> Dict[str, Any]: + """Run the headless simulation.""" + # Determine if we should run at max speed + max_speed_mode = not self.config.real_time and self.config.simulation.default_tps >= 1000 + + print(f"Starting headless simulation...") + print(f"Output directory: {self.config.output_dir}") + print(f"Max ticks: {self.config.max_ticks or 'unlimited'}") + print(f"Max duration: {self.config.max_duration or 'unlimited'} seconds") + print(f"Real-time mode: {self.config.real_time}") + print(f"Speed mode: {'Maximum speed' if max_speed_mode else f'{self.config.simulation.default_tps} TPS'}") + print(f"Output formats: {', '.join(self.config.output_formats)}") + print(f"Collectors: {', '.join(self.collectors.keys())}") + print() + + self.running = True + self.start_time = time.time() + self.simulation_core.start() + + # Enable sprint mode for maximum speed if not real-time mode + if max_speed_mode: + self.simulation_core.timing.set_sprint_mode(True) + print("Running at maximum speed (sprint mode enabled)") + + last_batch_time = time.time() + batch_interval = 5.0 # Write batch data every 5 seconds + + try: + while self.running: + # Check termination conditions + if self._should_terminate(): + break + + # Update simulation + if max_speed_mode: + # In max speed mode, update as fast as possible + self.simulation_core.update(0.0) + else: + # Normal timing-based updates + self.simulation_core.update(0.016) # ~60 FPS equivalent + + # Collect data + self._collect_data() + + # Write batch data periodically + if time.time() - last_batch_time >= batch_interval: + self._write_batch_data() + last_batch_time = time.time() + + # Real-time delay if needed + if self.config.real_time: + time.sleep(0.016) # ~60 FPS + + except KeyboardInterrupt: + print("\nSimulation interrupted by user") + except Exception as e: + print(f"Simulation error: {e}") + import traceback + traceback.print_exc() + finally: + self._finalize() + + return self._get_summary() + + def _should_terminate(self) -> bool: + """Check if simulation should terminate.""" + # Check max ticks + if self.config.max_ticks and self.simulation_core.state.total_ticks >= self.config.max_ticks: + print(f"Reached max ticks: {self.config.max_ticks}") + return True + + # Check max duration + if self.config.max_duration: + elapsed = time.time() - self.start_time + if elapsed >= self.config.max_duration: + print(f"Reached max duration: {self.config.max_duration} seconds") + return True + + return False + + def _collect_data(self): + """Collect data from all collectors.""" + for collector_name, collector in self.collectors.items(): + data_list = collector.update(self.simulation_core) + for data in data_list: + self.batch_data[collector_name].append(data) + + def _write_batch_data(self): + """Write collected data to files.""" + if not self.file_writer.is_ready(): + return + + for collector_name, data_list in self.batch_data.items(): + if not data_list: + continue + + for format_name, formatter in self.formatters.items(): + # Group data by tick and write one file per tick + data_by_tick = {} + for data in data_list: + tick = data.get('tick_count', data.get('tick', self.simulation_core.state.total_ticks)) + if tick not in data_by_tick: + data_by_tick[tick] = [] + data_by_tick[tick].append(data) + + # Write one file per tick for this collector + for tick, tick_data in data_by_tick.items(): + filename = f"{collector_name}_tick{tick}.{formatter.get_file_extension()}" + # If multiple data items for same tick, combine them or write the latest one + combined_data = tick_data[-1] if len(tick_data) == 1 else { + 'timestamp': tick_data[0].get('timestamp'), + 'tick_count': tick, + 'collection_type': collector_name, + 'multiple_entries': len(tick_data), + 'data': tick_data + } + formatted_data = formatter.format(combined_data) + self.file_writer.write(formatted_data, filename) + + # Clear written data + data_list.clear() + + print(f"Wrote batch data at tick {self.simulation_core.state.total_ticks}") + + def _finalize(self): + """Finalize simulation and write remaining data.""" + print("Finalizing simulation...") + + # Write any remaining data + self._write_batch_data() + + # Write final summary + summary = self._get_summary() + if 'json' in self.formatters: + summary_data = self.formatters['json'].format(summary) + self.file_writer.write(summary_data, "simulation_summary.json") + + # Stop simulation + self.simulation_core.stop() + self.file_writer.close() + + print("Simulation completed") + + def _get_summary(self) -> Dict[str, Any]: + """Get simulation summary.""" + duration = time.time() - self.start_time if self.start_time else 0 + world_state = self.simulation_core.get_world_state() + + return { + 'simulation_config': { + 'grid_width': self.config.simulation.grid_width, + 'grid_height': self.config.simulation.grid_height, + 'initial_cells': self.config.simulation.initial_cells, + 'initial_food': self.config.simulation.initial_food, + 'default_tps': self.config.simulation.default_tps + }, + 'runtime': { + 'duration_seconds': duration, + 'total_ticks': self.simulation_core.state.total_ticks, + 'average_tps': self.simulation_core.state.total_ticks / duration if duration > 0 else 0, + 'final_actual_tps': self.simulation_core.state.actual_tps + }, + 'final_state': world_state, + 'data_collection': { + 'collectors_used': list(self.collectors.keys()), + 'output_formats': self.config.output_formats, + 'output_directory': self.config.output_dir + } + } + + def _signal_handler(self, signum, frame): + """Handle shutdown signals.""" + print(f"\nReceived signal {signum}, shutting down gracefully...") + self.running = False + + def get_real_time_status(self) -> Dict[str, Any]: + """Get current simulation status (useful for monitoring).""" + return { + 'running': self.running, + 'ticks': self.simulation_core.state.total_ticks, + 'tps': self.simulation_core.state.actual_tps, + 'duration': time.time() - self.start_time if self.start_time else 0, + 'world_state': self.simulation_core.get_world_state() + } \ No newline at end of file diff --git a/entry_points/__init__.py b/entry_points/__init__.py new file mode 100644 index 0000000..5cc4c5a --- /dev/null +++ b/entry_points/__init__.py @@ -0,0 +1,3 @@ +"""Entry points for different simulation modes.""" + +__all__ = [] \ No newline at end of file diff --git a/entry_points/headless_main.py b/entry_points/headless_main.py new file mode 100644 index 0000000..4b1f093 --- /dev/null +++ b/entry_points/headless_main.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +"""Headless simulation entry point - runs simulations without UI.""" + +import sys +import argparse +from pathlib import Path + +# Add project root to path +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from config import ConfigLoader, HeadlessConfig +from engines import HeadlessSimulationEngine, HeadlessConfig as EngineHeadlessConfig + + +def main(): + """Main entry point for headless simulation.""" + parser = argparse.ArgumentParser(description="Run headless simulation") + parser.add_argument( + "--config", "-c", + type=str, + help="Path to configuration file (JSON/YAML)", + default=None + ) + parser.add_argument( + "--output-dir", "-o", + type=str, + help="Output directory for simulation data", + default="simulation_output" + ) + parser.add_argument( + "--max-ticks", "-t", + type=int, + help="Maximum number of ticks to run", + default=None + ) + parser.add_argument( + "--max-duration", "-d", + type=float, + help="Maximum duration in seconds", + default=None + ) + parser.add_argument( + "--tps", + type=float, + help="Target TPS for simulation (default: unlimited speed)", + default=None + ) + parser.add_argument( + "--real-time", + action="store_true", + help="Run in real-time mode (instead of as fast as possible)" + ) + parser.add_argument( + "--collect-every-tick", + action="store_true", + help="Collect data on every tick instead of at intervals" + ) + parser.add_argument( + "--create-sample-configs", + action="store_true", + help="Create sample configuration files and exit" + ) + + args = parser.parse_args() + + # Create sample configs if requested + if args.create_sample_configs: + ConfigLoader.create_sample_configs() + return + + # Load configuration + try: + if args.config: + headless_config = ConfigLoader.load_headless_config(args.config) + else: + headless_config = ConfigLoader.get_default_headless_config() + + # Override config with command line arguments + if args.max_ticks: + headless_config.max_ticks = args.max_ticks + if args.max_duration: + headless_config.max_duration = args.max_duration + if args.tps is not None: + headless_config.simulation.default_tps = args.tps + else: + # No TPS specified - run at maximum speed + headless_config.simulation.default_tps = 999999999.0 + if args.output_dir: + headless_config.output.directory = args.output_dir + if args.real_time: + headless_config.output.real_time = True + if args.collect_every_tick: + # Set all collection intervals to 1 for every-tick collection + headless_config.output.metrics_interval = 1 + headless_config.output.entities_interval = 1 + headless_config.output.evolution_interval = 1 + + except Exception as e: + print(f"Error loading configuration: {e}") + sys.exit(1) + + # Create engine configuration + engine_config = EngineHeadlessConfig( + simulation=headless_config.simulation, + max_ticks=headless_config.max_ticks, + max_duration=headless_config.max_duration, + output_dir=headless_config.output.directory, + enable_metrics=headless_config.output.collect_metrics, + enable_entities=headless_config.output.collect_entities, + enable_evolution=headless_config.output.collect_evolution, + metrics_interval=headless_config.output.metrics_interval, + entities_interval=headless_config.output.entities_interval, + evolution_interval=headless_config.output.evolution_interval, + output_formats=headless_config.output.formats, + real_time=headless_config.output.real_time + ) + + # Create and run simulation + try: + print("Starting headless simulation...") + engine = HeadlessSimulationEngine(engine_config) + summary = engine.run() + + # Print summary + print("\n" + "="*50) + print("SIMULATION SUMMARY") + print("="*50) + print(f"Total ticks: {summary['runtime']['total_ticks']}") + print(f"Duration: {summary['runtime']['duration_seconds']:.2f} seconds") + print(f"Average TPS: {summary['runtime']['average_tps']:.2f}") + print(f"Final entity counts: {summary['final_state']['entity_counts']}") + print(f"Output directory: {summary['data_collection']['output_directory']}") + + except KeyboardInterrupt: + print("\nSimulation interrupted by user") + sys.exit(0) + except Exception as e: + print(f"Simulation error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/entry_points/interactive_main.py b/entry_points/interactive_main.py new file mode 100644 index 0000000..bdbe657 --- /dev/null +++ b/entry_points/interactive_main.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +"""Interactive simulation entry point - runs simulation with UI.""" + +import sys +import argparse +from pathlib import Path + +# Add project root to path +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from config import ConfigLoader, InteractiveConfig +from core.simulation_engine import SimulationEngine + + +def main(): + """Main entry point for interactive simulation.""" + parser = argparse.ArgumentParser(description="Run interactive simulation with UI") + parser.add_argument( + "--config", "-c", + type=str, + help="Path to configuration file (JSON/YAML)", + default=None + ) + parser.add_argument( + "--create-sample-configs", + action="store_true", + help="Create sample configuration files and exit" + ) + + args = parser.parse_args() + + # Create sample configs if requested + if args.create_sample_configs: + ConfigLoader.create_sample_configs() + return + + # Load configuration + try: + if args.config: + config = ConfigLoader.load_interactive_config(args.config) + else: + config = ConfigLoader.get_default_interactive_config() + + except Exception as e: + print(f"Error loading configuration: {e}") + sys.exit(1) + + # Run simulation + try: + print("Starting interactive simulation...") + engine = SimulationEngine() + engine.run() + + except KeyboardInterrupt: + print("\nSimulation interrupted by user") + sys.exit(0) + except Exception as e: + print(f"Simulation error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/output/__init__.py b/output/__init__.py new file mode 100644 index 0000000..d73eedd --- /dev/null +++ b/output/__init__.py @@ -0,0 +1,17 @@ +"""Data collection and output system for headless simulations.""" + +from .collectors.metrics_collector import MetricsCollector +from .collectors.entity_collector import EntityCollector +from .collectors.evolution_collector import EvolutionCollector +from .formatters.json_formatter import JSONFormatter +from .formatters.csv_formatter import CSVFormatter +from .writers.file_writer import FileWriter + +__all__ = [ + 'MetricsCollector', + 'EntityCollector', + 'EvolutionCollector', + 'JSONFormatter', + 'CSVFormatter', + 'FileWriter' +] \ No newline at end of file diff --git a/output/collectors/base_collector.py b/output/collectors/base_collector.py new file mode 100644 index 0000000..32b2f18 --- /dev/null +++ b/output/collectors/base_collector.py @@ -0,0 +1,44 @@ +"""Base data collector interface.""" + +from abc import ABC, abstractmethod +from typing import Any, Dict, List +from ..formatters.base_formatter import BaseFormatter +from ..writers.base_writer import BaseWriter + + +class BaseCollector(ABC): + """Base class for data collectors.""" + + def __init__(self, collection_interval: int = 100): + self.collection_interval = collection_interval + self.last_collected_tick = -1 + self.data_buffer: List[Dict[str, Any]] = [] + + @abstractmethod + def collect(self, simulation_core) -> Dict[str, Any]: + """Collect data from simulation core.""" + pass + + def should_collect(self, current_tick: int) -> bool: + """Check if data should be collected on current tick.""" + return current_tick - self.last_collected_tick >= self.collection_interval + + def update(self, simulation_core) -> List[Dict[str, Any]]: + """Update collector and return collected data if interval reached.""" + current_tick = simulation_core.state.total_ticks + if self.should_collect(current_tick): + data = self.collect(simulation_core) + self.data_buffer.append(data) + self.last_collected_tick = current_tick + return [data] + return [] + + def get_buffered_data(self) -> List[Dict[str, Any]]: + """Get all buffered data and clear buffer.""" + data = self.data_buffer.copy() + self.data_buffer.clear() + return data + + def clear_buffer(self): + """Clear the data buffer.""" + self.data_buffer.clear() \ No newline at end of file diff --git a/output/collectors/entity_collector.py b/output/collectors/entity_collector.py new file mode 100644 index 0000000..a9a3da3 --- /dev/null +++ b/output/collectors/entity_collector.py @@ -0,0 +1,72 @@ +"""Entity state collector for detailed entity tracking.""" + +from typing import Dict, Any, List +from .base_collector import BaseCollector +from world.objects import DefaultCell + + +class EntityCollector(BaseCollector): + """Collects detailed entity state information.""" + + def __init__(self, collection_interval: int = 1000, include_cells: bool = True, include_food: bool = False): + super().__init__(collection_interval) + self.include_cells = include_cells + self.include_food = include_food + + def collect(self, simulation_core) -> Dict[str, Any]: + """Collect entity states from simulation core.""" + world_state = simulation_core.get_world_state() + entities = [] + + for entity_data in simulation_core.get_entity_states(): + entity_type = entity_data['type'] + + # Filter by entity type based on configuration + if entity_type == 'cell' and not self.include_cells: + continue + elif entity_type == 'food' and not self.include_food: + continue + + entities.append(entity_data) + + # Calculate additional statistics for cells + cell_stats = {} + if self.include_cells: + cells = [e for e in entities if e['type'] == 'cell'] + if cells: + energies = [c['energy'] for c in cells] + ages = [c['age'] for c in cells] + generations = [c['generation'] for c in cells] + + cell_stats = { + 'avg_energy': sum(energies) / len(energies), + 'max_energy': max(energies), + 'min_energy': min(energies), + 'avg_age': sum(ages) / len(ages), + 'max_age': max(ages), + 'avg_generation': sum(generations) / len(generations), + 'max_generation': max(generations) + } + + # Calculate food statistics + food_stats = {} + if self.include_food: + foods = [e for e in entities if e['type'] == 'food'] + if foods: + decays = [f['decay'] for f in foods] + food_stats = { + 'avg_decay': sum(decays) / len(decays), + 'max_decay': max(decays), + 'min_decay': min(decays), + 'fresh_food': len([f for f in foods if f['decay'] < f['max_decay'] * 0.5]) + } + + return { + 'timestamp': simulation_core.timing.last_tick_time, + 'tick_count': world_state['tick_count'], + 'entity_count': len(entities), + 'entities': entities, + 'cell_statistics': cell_stats, + 'food_statistics': food_stats, + 'collection_type': 'entities' + } \ No newline at end of file diff --git a/output/collectors/evolution_collector.py b/output/collectors/evolution_collector.py new file mode 100644 index 0000000..6181dfc --- /dev/null +++ b/output/collectors/evolution_collector.py @@ -0,0 +1,100 @@ +"""Evolution data collector for tracking neural network changes.""" + +from typing import Dict, Any, List +from .base_collector import BaseCollector +from world.objects import DefaultCell +import numpy as np + + +class EvolutionCollector(BaseCollector): + """Collects evolution and neural network data.""" + + def __init__(self, collection_interval: int = 1000): + super().__init__(collection_interval) + + def collect(self, simulation_core) -> Dict[str, Any]: + """Collect evolution data from simulation core.""" + world_state = simulation_core.get_world_state() + + cells_data = [] + network_architectures = {} + total_weights = [] + layer_sizes_common = {} + + for entity_data in simulation_core.get_entity_states(): + if entity_data['type'] == 'cell': + cells_data.append(entity_data) + + # Track neural network architecture + nn_data = entity_data['neural_network'] + layers_key = str(nn_data['layer_sizes']) # Convert to string for JSON compatibility + + if layers_key not in network_architectures: + network_architectures[layers_key] = 0 + network_architectures[layers_key] += 1 + + # Use layer count as a proxy for complexity + total_weights.append(len(nn_data['layer_sizes'])) + + # Track layer size frequencies + for size in nn_data['layer_sizes']: + if size not in layer_sizes_common: + layer_sizes_common[size] = 0 + layer_sizes_common[size] += 1 + + # Calculate evolution statistics + evolution_stats = {} + if cells_data: + energies = [c['energy'] for c in cells_data] + ages = [c['age'] for c in cells_data] + generations = [c['generation'] for c in cells_data] + + evolution_stats = { + 'cell_count': len(cells_data), + 'energy_distribution': { + 'mean': np.mean(energies), + 'std': np.std(energies), + 'min': min(energies), + 'max': max(energies), + 'median': np.median(energies) + }, + 'age_distribution': { + 'mean': np.mean(ages), + 'std': np.std(ages), + 'min': min(ages), + 'max': max(ages), + 'median': np.median(ages) + }, + 'generation_distribution': { + 'mean': np.mean(generations), + 'std': np.std(generations), + 'min': min(generations), + 'max': max(generations), + 'median': np.median(generations) + } + } + + # Network architecture statistics + network_stats = {} + if total_weights: + network_stats = { + 'architecture_diversity': len(network_architectures), + 'most_common_architecture': max(network_architectures.items(), key=lambda x: x[1]) if network_architectures else None, + 'complexity_distribution': { + 'mean': np.mean(total_weights), + 'std': np.std(total_weights), + 'min': min(total_weights), + 'max': max(total_weights) + }, + 'layer_size_diversity': len(layer_sizes_common), + 'most_common_layer_sizes': sorted(layer_sizes_common.items(), key=lambda x: x[1], reverse=True)[:5] + } + + return { + 'timestamp': simulation_core.timing.last_tick_time, + 'tick_count': world_state['tick_count'], + 'evolution_statistics': evolution_stats, + 'network_statistics': network_stats, + 'network_architectures': dict(network_architectures), + 'collection_type': 'evolution' + } \ No newline at end of file diff --git a/output/collectors/metrics_collector.py b/output/collectors/metrics_collector.py new file mode 100644 index 0000000..c8523f0 --- /dev/null +++ b/output/collectors/metrics_collector.py @@ -0,0 +1,28 @@ +"""Basic metrics collector for simulation statistics.""" + +from typing import Dict, Any +from .base_collector import BaseCollector + + +class MetricsCollector(BaseCollector): + """Collects basic simulation metrics.""" + + def __init__(self, collection_interval: int = 100): + super().__init__(collection_interval) + + def collect(self, simulation_core) -> Dict[str, Any]: + """Collect basic metrics from simulation core.""" + world_state = simulation_core.get_world_state() + + return { + 'timestamp': simulation_core.timing.last_tick_time, + 'tick_count': world_state['tick_count'], + 'actual_tps': world_state['actual_tps'], + 'target_tps': world_state['target_tps'], + 'speed_multiplier': world_state['speed_multiplier'], + 'is_paused': world_state['is_paused'], + 'sprint_mode': world_state['sprint_mode'], + 'world_buffer': world_state['world_buffer'], + 'entity_counts': world_state['entity_counts'], + 'collection_type': 'metrics' + } \ No newline at end of file diff --git a/output/formatters/base_formatter.py b/output/formatters/base_formatter.py new file mode 100644 index 0000000..7a2b414 --- /dev/null +++ b/output/formatters/base_formatter.py @@ -0,0 +1,18 @@ +"""Base formatter interface for output data.""" + +from abc import ABC, abstractmethod +from typing import Any + + +class BaseFormatter(ABC): + """Base class for data formatters.""" + + @abstractmethod + def format(self, data: Any) -> str: + """Format data for output.""" + pass + + @abstractmethod + def get_file_extension(self) -> str: + """Get file extension for this format.""" + pass \ No newline at end of file diff --git a/output/formatters/csv_formatter.py b/output/formatters/csv_formatter.py new file mode 100644 index 0000000..161a307 --- /dev/null +++ b/output/formatters/csv_formatter.py @@ -0,0 +1,83 @@ +"""CSV formatter for tabular output data.""" + +import csv +import io +from typing import Any, List, Dict +from .base_formatter import BaseFormatter + + +class CSVFormatter(BaseFormatter): + """Formats data as CSV.""" + + def __init__(self, flatten_nested: bool = True): + self.flatten_nested = flatten_nested + + def format(self, data: Any) -> str: + """Format data as CSV string.""" + if isinstance(data, list): + return self._format_list(data) + elif isinstance(data, dict): + return self._format_dict(data) + else: + # Single value + return str(data) + + def _format_list(self, data: List[Dict[str, Any]]) -> str: + """Format list of dictionaries as CSV.""" + if not data: + return "" + + # Flatten nested dictionaries if requested + processed_data = [] + for item in data: + if self.flatten_nested: + processed_data.append(self._flatten_dict(item)) + else: + processed_data.append(item) + + # Get all field names from all items + fieldnames = set() + for item in processed_data: + fieldnames.update(item.keys()) + fieldnames = sorted(fieldnames) + + # Create CSV + output = io.StringIO() + writer = csv.DictWriter(output, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(processed_data) + + return output.getvalue() + + def _format_dict(self, data: Dict[str, Any]) -> str: + """Format single dictionary as CSV.""" + processed_data = self._flatten_dict(data) if self.flatten_nested else data + fieldnames = sorted(processed_data.keys()) + + output = io.StringIO() + writer = csv.DictWriter(output, fieldnames=fieldnames) + writer.writeheader() + writer.writerow(processed_data) + + return output.getvalue() + + def _flatten_dict(self, data: Dict[str, Any], parent_key: str = '', sep: str = '_') -> Dict[str, Any]: + """Flatten nested dictionaries.""" + items = [] + for key, value in data.items(): + new_key = f"{parent_key}{sep}{key}" if parent_key else key + if isinstance(value, dict): + items.extend(self._flatten_dict(value, new_key, sep).items()) + elif isinstance(value, list): + # Convert lists to strings or handle each element + if value and isinstance(value[0], (int, float, str)): + items.append((new_key, ','.join(map(str, value)))) + else: + items.append((new_key, str(value))) + else: + items.append((new_key, value)) + return dict(items) + + def get_file_extension(self) -> str: + """Get file extension for CSV format.""" + return "csv" \ No newline at end of file diff --git a/output/formatters/json_formatter.py b/output/formatters/json_formatter.py new file mode 100644 index 0000000..249f77c --- /dev/null +++ b/output/formatters/json_formatter.py @@ -0,0 +1,20 @@ +"""JSON formatter for output data.""" + +import json +from typing import Any +from .base_formatter import BaseFormatter + + +class JSONFormatter(BaseFormatter): + """Formats data as JSON.""" + + def __init__(self, indent: int = 2): + self.indent = indent + + def format(self, data: Any) -> str: + """Format data as JSON string.""" + return json.dumps(data, indent=self.indent, default=str) + + def get_file_extension(self) -> str: + """Get file extension for JSON format.""" + return "json" \ No newline at end of file diff --git a/output/writers/base_writer.py b/output/writers/base_writer.py new file mode 100644 index 0000000..0e1497e --- /dev/null +++ b/output/writers/base_writer.py @@ -0,0 +1,23 @@ +"""Base writer interface for output data.""" + +from abc import ABC, abstractmethod +from typing import Any + + +class BaseWriter(ABC): + """Base class for data writers.""" + + @abstractmethod + def write(self, data: Any) -> bool: + """Write data to output destination.""" + pass + + @abstractmethod + def close(self): + """Close writer and cleanup resources.""" + pass + + @abstractmethod + def is_ready(self) -> bool: + """Check if writer is ready for writing.""" + pass \ No newline at end of file diff --git a/output/writers/file_writer.py b/output/writers/file_writer.py new file mode 100644 index 0000000..3041c44 --- /dev/null +++ b/output/writers/file_writer.py @@ -0,0 +1,140 @@ +"""File writer for output data.""" + +import os +from pathlib import Path +from typing import Any, List, Optional +from .base_writer import BaseWriter + + +class FileWriter(BaseWriter): + """Writes data to files.""" + + def __init__(self, output_dir: str = "simulation_output", create_dirs: bool = True): + self.output_dir = Path(output_dir) + self.create_dirs = create_dirs + self.file_handles = {} + self.ready = False + + if self.create_dirs: + self.output_dir.mkdir(parents=True, exist_ok=True) + + self.ready = self.output_dir.exists() + + def write(self, data: Any, filename: Optional[str] = None, data_type: str = "data") -> bool: + """Write data to file.""" + if not self.ready: + return False + + try: + if filename is None: + # Generate filename based on data type and timestamp + import time + timestamp = int(time.time()) + filename = f"{data_type}_{timestamp}.txt" + + filepath = self.output_dir / filename + + # Handle different data types + if isinstance(data, str): + # Already formatted data + with open(filepath, 'w', encoding='utf-8') as f: + f.write(data) + elif isinstance(data, (list, dict)): + # Convert to string representation + with open(filepath, 'w', encoding='utf-8') as f: + f.write(str(data)) + else: + # Single value + with open(filepath, 'w', encoding='utf-8') as f: + f.write(str(data)) + + return True + + except Exception as e: + print(f"Error writing to file {filename}: {e}") + return False + + def write_batch(self, data_list: List[Any], filename: str, data_type: str = "batch") -> bool: + """Write multiple data items to a single file.""" + if not self.ready or not data_list: + return False + + try: + filepath = self.output_dir / filename + + # Combine all data items + if isinstance(data_list[0], str): + # Already formatted strings + content = '\n'.join(data_list) + else: + # Convert to string representation + content = '\n'.join(str(item) for item in data_list) + + with open(filepath, 'w', encoding='utf-8') as f: + f.write(content) + + return True + + except Exception as e: + print(f"Error writing batch to file {filename}: {e}") + return False + + def append(self, data: Any, filename: str) -> bool: + """Append data to existing file.""" + if not self.ready: + return False + + try: + filepath = self.output_dir / filename + + with open(filepath, 'a', encoding='utf-8') as f: + f.write(str(data)) + f.write('\n') # Add newline + + return True + + except Exception as e: + print(f"Error appending to file {filename}: {e}") + return False + + def get_file_path(self, filename: str) -> Path: + """Get full path for a filename.""" + return self.output_dir / filename + + def list_files(self, pattern: str = "*") -> List[str]: + """List files in output directory.""" + if not self.ready: + return [] + + try: + return [f.name for f in self.output_dir.glob(pattern) if f.is_file()] + except Exception: + return [] + + def delete_file(self, filename: str) -> bool: + """Delete a file.""" + if not self.ready: + return False + + try: + filepath = self.output_dir / filename + if filepath.exists(): + filepath.unlink() + return True + return False + except Exception as e: + print(f"Error deleting file {filename}: {e}") + return False + + def close(self): + """Close file handles and cleanup.""" + for handle in self.file_handles.values(): + try: + handle.close() + except: + pass + self.file_handles.clear() + + def is_ready(self) -> bool: + """Check if writer is ready for writing.""" + return self.ready \ No newline at end of file diff --git a/ui/hud.py b/ui/hud.py index 53757b8..d9d436f 100644 --- a/ui/hud.py +++ b/ui/hud.py @@ -6,12 +6,18 @@ import pygame_gui from config.constants import * from world.base.brain import CellBrain, FlexibleNeuralNetwork from world.objects import DefaultCell -from pygame_gui.elements import UIPanel +from pygame_gui.elements import UIPanel, UIButton, UITextEntryLine, UILabel import math DARK_GRAY = (40, 40, 40) DARKER_GRAY = (25, 25, 25) +# Panel visibility constants +SHOW_CONTROL_BAR = True +SHOW_INSPECTOR_PANEL = False +SHOW_PROPERTIES_PANEL = True +SHOW_CONSOLE_PANEL = False + class HUD: def __init__(self, ui_manager, screen_width=SCREEN_WIDTH, screen_height=SCREEN_HEIGHT): self.font = pygame.font.Font("freesansbold.ttf", FONT_SIZE) @@ -29,90 +35,307 @@ class HUD: self.splitter_thickness = 6 self.dragging_splitter = None + + # Simulation control elements + self.play_pause_button = None + self.step_button = None + self.sprint_button = None + self.speed_buttons = {} + self.custom_tps_entry = None + self.tps_label = None + self._create_panels() + self._create_simulation_controls() def _create_panels(self): + self.panels = [] + # Top control bar - self.control_bar = UIPanel( - relative_rect=pygame.Rect(0, 0, self.screen_width, self.control_bar_height), - manager=self.manager, - object_id="#control_bar", - ) + if SHOW_CONTROL_BAR: + self.control_bar = UIPanel( + relative_rect=pygame.Rect(0, 0, self.screen_width, self.control_bar_height), + manager=self.manager, + object_id="#control_bar", + ) + self.panels.append(self.control_bar) + else: + self.control_bar = None # Left inspector - self.inspector_panel = UIPanel( - relative_rect=pygame.Rect( - 0, self.control_bar_height, - self.inspector_width, - self.screen_height - self.control_bar_height - ), - manager=self.manager, - object_id="#inspector_panel", - ) + if SHOW_INSPECTOR_PANEL: + self.inspector_panel = UIPanel( + relative_rect=pygame.Rect( + 0, self.control_bar_height if SHOW_CONTROL_BAR else 0, + self.inspector_width, + self.screen_height - (self.control_bar_height if SHOW_CONTROL_BAR else 0) + ), + manager=self.manager, + object_id="#inspector_panel", + ) + self.panels.append(self.inspector_panel) + else: + self.inspector_panel = None # Right properties - self.properties_panel = UIPanel( - relative_rect=pygame.Rect( - self.screen_width - self.properties_width, - self.control_bar_height, - self.properties_width, - self.screen_height - self.control_bar_height - ), - manager=self.manager, - object_id="#properties_panel", - ) + if SHOW_PROPERTIES_PANEL: + self.properties_panel = UIPanel( + relative_rect=pygame.Rect( + self.screen_width - self.properties_width, + self.control_bar_height if SHOW_CONTROL_BAR else 0, + self.properties_width, + self.screen_height - (self.control_bar_height if SHOW_CONTROL_BAR else 0) + ), + manager=self.manager, + object_id="#properties_panel", + ) + self.panels.append(self.properties_panel) + else: + self.properties_panel = None # Bottom console - self.console_panel = UIPanel( - relative_rect=pygame.Rect( - self.inspector_width, - self.screen_height - self.console_height, - self.screen_width - self.inspector_width - self.properties_width, - self.console_height - ), + if SHOW_CONSOLE_PANEL: + self.console_panel = UIPanel( + relative_rect=pygame.Rect( + self.inspector_width if SHOW_INSPECTOR_PANEL else 0, + self.screen_height - self.console_height, + self.screen_width - (self.inspector_width if SHOW_INSPECTOR_PANEL else 0) - (self.properties_width if SHOW_PROPERTIES_PANEL else 0), + self.console_height + ), + manager=self.manager, + object_id="#console_panel", + ) + self.panels.append(self.console_panel) + else: + self.console_panel = None + + self.dragging_splitter = None + + def _create_simulation_controls(self): + """Create simulation control buttons in the control bar.""" + if not self.control_bar: + return + + # Button layout constants + button_width = 40 + button_height = 32 + button_spacing = 8 + start_x = 20 + start_y = 8 + + # Play/Pause button + self.play_pause_button = UIButton( + relative_rect=pygame.Rect(start_x, start_y, button_width, button_height), + text='>', manager=self.manager, - object_id="#console_panel", + container=self.control_bar, + object_id="#play_pause_button" ) - self.panels = [ - self.control_bar, - self.inspector_panel, - self.properties_panel, - self.console_panel - ] - self.dragging_splitter = None + # Step forward button + step_x = start_x + button_width + button_spacing + self.step_button = UIButton( + relative_rect=pygame.Rect(step_x, start_y, button_width, button_height), + text='>>', + manager=self.manager, + container=self.control_bar, + object_id="#step_button" + ) + + # Sprint button + sprint_x = step_x + button_width + button_spacing + 5 # Extra spacing + self.sprint_button = UIButton( + relative_rect=pygame.Rect(sprint_x, start_y, button_width + 10, button_height), + text='>>|', + manager=self.manager, + container=self.control_bar, + object_id="#sprint_button" + ) + + # Speed control buttons + speed_labels = ["0.5x", "1x", "2x", "4x", "8x"] + speed_multipliers = [0.5, 1.0, 2.0, 4.0, 8.0] + speed_x = sprint_x + button_width + 10 + button_spacing + 10 # Extra spacing + + for i, (label, multiplier) in enumerate(zip(speed_labels, speed_multipliers)): + button_x = speed_x + i * (button_width - 5 + button_spacing) + button = UIButton( + relative_rect=pygame.Rect(button_x, start_y, button_width - 5, button_height), + text=label, + manager=self.manager, + container=self.control_bar, + object_id=f"#speed_{int(multiplier*10)}x_button" + ) + self.speed_buttons[multiplier] = button + + # Custom TPS input + tps_x = speed_x + len(speed_labels) * (button_width - 5 + button_spacing) + button_spacing + self.custom_tps_entry = UITextEntryLine( + relative_rect=pygame.Rect(tps_x, start_y + 2, 50, button_height - 4), + manager=self.manager, + container=self.control_bar, + object_id="#custom_tps_entry" + ) + self.custom_tps_entry.set_text(str(DEFAULT_TPS)) + + # TPS display label + tps_label_x = tps_x + 55 + self.tps_label = UILabel( + relative_rect=pygame.Rect(tps_label_x, start_y + 4, 80, button_height - 8), + text='TPS: 40', + manager=self.manager, + container=self.control_bar, + object_id="#tps_label" + ) def get_viewport_rect(self): # Returns the rect for the simulation viewport - x = self.inspector_width - y = self.control_bar_height - w = self.screen_width - self.inspector_width - self.properties_width - h = self.screen_height - self.control_bar_height - self.console_height + inspector_width = self.inspector_width if SHOW_INSPECTOR_PANEL and self.inspector_panel else 0 + control_bar_height = self.control_bar_height if SHOW_CONTROL_BAR and self.control_bar else 0 + properties_width = self.properties_width if SHOW_PROPERTIES_PANEL and self.properties_panel else 0 + console_height = self.console_height if SHOW_CONSOLE_PANEL and self.console_panel else 0 + + x = inspector_width + y = control_bar_height + w = self.screen_width - inspector_width - properties_width + h = self.screen_height - control_bar_height - console_height return pygame.Rect(x, y, w, h) def update_layout(self, window_width, window_height): self.screen_width = window_width self.screen_height = window_height + control_bar_height = self.control_bar_height if SHOW_CONTROL_BAR and self.control_bar else 0 + # Control bar (top) - self.control_bar.set_relative_position((0, 0)) - self.control_bar.set_dimensions((self.screen_width, self.control_bar_height)) + if self.control_bar: + self.control_bar.set_relative_position((0, 0)) + self.control_bar.set_dimensions((self.screen_width, self.control_bar_height)) - # Inspector panel (left) - goes all the way to the bottom - self.inspector_panel.set_relative_position((0, self.control_bar_height)) - self.inspector_panel.set_dimensions((self.inspector_width, self.screen_height - self.control_bar_height)) + # Inspector panel (left) + if self.inspector_panel: + self.inspector_panel.set_relative_position((0, control_bar_height)) + self.inspector_panel.set_dimensions((self.inspector_width, self.screen_height - control_bar_height)) - # Properties panel (right) - goes all the way to the bottom - self.properties_panel.set_relative_position( - (self.screen_width - self.properties_width, self.control_bar_height)) - self.properties_panel.set_dimensions((self.properties_width, self.screen_height - self.control_bar_height)) + # Properties panel (right) + if self.properties_panel: + self.properties_panel.set_relative_position( + (self.screen_width - self.properties_width, control_bar_height)) + self.properties_panel.set_dimensions((self.properties_width, self.screen_height - control_bar_height)) # Console panel (bottom, spans between inspector and properties) - self.console_panel.set_relative_position((self.inspector_width, self.screen_height - self.console_height)) - self.console_panel.set_dimensions( - (self.screen_width - self.inspector_width - self.properties_width, self.console_height)) + if self.console_panel: + inspector_width = self.inspector_width if SHOW_INSPECTOR_PANEL and self.inspector_panel else 0 + properties_width = self.properties_width if SHOW_PROPERTIES_PANEL and self.properties_panel else 0 + self.console_panel.set_relative_position((inspector_width, self.screen_height - self.console_height)) + self.console_panel.set_dimensions( + (self.screen_width - inspector_width - properties_width, self.console_height)) + + # Recreate simulation controls after layout change + if hasattr(self, 'play_pause_button'): + self._destroy_simulation_controls() + self._create_simulation_controls() + + def _destroy_simulation_controls(self): + """Destroy simulation control elements.""" + if self.play_pause_button: + self.play_pause_button.kill() + if self.step_button: + self.step_button.kill() + if self.sprint_button: + self.sprint_button.kill() + for button in self.speed_buttons.values(): + button.kill() + if self.custom_tps_entry: + self.custom_tps_entry.kill() + if self.tps_label: + self.tps_label.kill() + + self.play_pause_button = None + self.step_button = None + self.sprint_button = None + self.speed_buttons = {} + self.custom_tps_entry = None + self.tps_label = None + + def update_simulation_controls(self, simulation_core): + """Update simulation control button states and displays based on simulation core state.""" + if not self.play_pause_button: + return + + timing_state = simulation_core.timing.state + + # Update play/pause button + if timing_state.is_paused: + self.play_pause_button.set_text('>') + else: + self.play_pause_button.set_text('||') + + # Update speed button highlights + speed_presets = {0.5: "0.5x", 1.0: "1x", 2.0: "2x", 4.0: "4x", 8.0: "8x"} + for multiplier, button in self.speed_buttons.items(): + if (timing_state.speed_multiplier == multiplier and + not timing_state.is_paused and + not timing_state.sprint_mode): + # Active speed button - make text more prominent + button.set_text(f"[{speed_presets[multiplier]}]") + else: + # Normal button appearance + button.set_text(speed_presets[multiplier]) + + # Update sprint button appearance + if timing_state.sprint_mode: + self.sprint_button.set_text('⚡') + else: + self.sprint_button.set_text('⚡') + + # Update TPS display + if self.tps_label: + if timing_state.sprint_mode: + self.tps_label.set_text(f"TPS: {timing_state.tps:.0f} (Sprint)") + else: + self.tps_label.set_text(f"TPS: {timing_state.tps:.0f}") + + # Update custom TPS entry + if self.custom_tps_entry and not self.custom_tps_entry.is_focused: + self.custom_tps_entry.set_text(str(int(timing_state.tps))) def process_event(self, event): + # Handle simulation control button events using ID matching + if event.type == pygame_gui.UI_BUTTON_START_PRESS: + object_id = str(event.ui_object_id) + + if '#play_pause_button' in object_id: + return 'toggle_pause' + elif '#step_button' in object_id: + return 'step_forward' + elif '#sprint_button' in object_id: + return 'toggle_sprint' + elif '#speed_5x_button' in object_id: # 0.5x button + return 'set_speed', 0.5 + elif '#speed_10x_button' in object_id: # 1x button + return 'set_speed', 1.0 + elif '#speed_20x_button' in object_id: # 2x button + return 'set_speed', 2.0 + elif '#speed_40x_button' in object_id: # 4x button + return 'set_speed', 4.0 + elif '#speed_80x_button' in object_id: # 8x button + return 'set_speed', 8.0 + + elif event.type == pygame_gui.UI_TEXT_ENTRY_FINISHED: + object_id = str(event.ui_object_id) + print(f"Text entry finished: {object_id}, text: {event.text}") + if '#custom_tps_entry' in object_id: + try: + tps = float(event.text) + return 'set_custom_tps', tps + except ValueError: + # Invalid TPS value, reset to current TPS + return ('reset_tps_display',) + elif event.type == pygame_gui.UI_TEXT_ENTRY_CHANGED: + object_id = str(event.ui_object_id) + if '#custom_tps_entry' in object_id: + print(f"Text entry changed: {object_id}, text: {event.text}") + # Handle splitter dragging for resizing panels if event.type == pygame.MOUSEBUTTONDOWN and event.button == 1: mx, my = event.pos @@ -143,12 +366,17 @@ class HUD: indicator_gap = 4 # Gap between indicator lines indicator_count = 3 # Number of indicator lines + inspector_width = self.inspector_width if SHOW_INSPECTOR_PANEL and self.inspector_panel else 0 + properties_width = self.properties_width if SHOW_PROPERTIES_PANEL and self.properties_panel else 0 + console_height = self.console_height if SHOW_CONSOLE_PANEL and self.console_panel else 0 + control_bar_height = self.control_bar_height if SHOW_CONTROL_BAR and self.control_bar else 0 + # Vertical splitter (inspector/properties) # Inspector/properties only if wide enough - if self.inspector_width > 0: - x = self.inspector_width - 2 - y1 = self.control_bar_height - y2 = self.screen_height - self.console_height + if inspector_width > 0: + x = inspector_width - 2 + y1 = control_bar_height + y2 = self.screen_height - console_height # Draw indicator (horizontal lines) in the middle mid_y = (y1 + y2) // 2 for i in range(indicator_count): @@ -160,10 +388,10 @@ class HUD: 2 ) - if self.properties_width > 0: - x = self.screen_width - self.properties_width + 2 - y1 = self.control_bar_height - y2 = self.screen_height - self.console_height + if properties_width > 0: + x = self.screen_width - properties_width + 2 + y1 = control_bar_height + y2 = self.screen_height - console_height mid_y = (y1 + y2) // 2 for i in range(indicator_count): offset = (i - 1) * (indicator_gap + 1) @@ -175,10 +403,10 @@ class HUD: ) # Horizontal splitter (console) - if self.console_height > 0: - y = self.screen_height - self.console_height + 2 - x1 = self.inspector_width - x2 = self.screen_width - self.properties_width + if console_height > 0: + y = self.screen_height - console_height + 2 + x1 = inspector_width + x2 = self.screen_width - properties_width mid_x = (x1 + x2) // 2 for i in range(indicator_count): offset = (i - 1) * (indicator_gap + 1) @@ -210,7 +438,8 @@ class HUD: def render_tps(self, screen, actual_tps): """Render TPS in bottom right.""" - tps_text = self.font.render(f"TPS: {actual_tps}", True, WHITE) + display_tps = round(actual_tps) # Round to nearest whole number + tps_text = self.font.render(f"TPS: {display_tps}", True, WHITE) tps_rect = tps_text.get_rect() tps_rect.bottomright = (self.screen_width - HUD_MARGIN, self.screen_height - HUD_MARGIN) screen.blit(tps_text, tps_rect) @@ -644,7 +873,8 @@ class HUD: def render_sprint_debug(self, screen, actual_tps, total_ticks, cell_count=None): """Render sprint debug info: header, TPS, and tick count.""" header = self.font.render("Sprinting...", True, (255, 200, 0)) - tps_text = self.font.render(f"TPS: {actual_tps}", True, (255, 255, 255)) + display_tps = round(actual_tps) # Round to nearest whole number + tps_text = self.font.render(f"TPS: {display_tps}", True, (255, 255, 255)) ticks_text = self.font.render(f"Ticks: {total_ticks}", True, (255, 255, 255)) cell_text = self.font.render(f"Cells: {cell_count}" if cell_count is not None else "Cells: N/A", True, (255, 255, 255))