Sam 3a34759094
Some checks failed
Build Simulation and Test / Run All Tests (push) Failing after 8m17s
Add core simulation components and configuration classes
Major rewrite.
2025-11-08 19:17:40 -06:00

44 lines
1.5 KiB
Python

"""Base data collector interface."""
from abc import ABC, abstractmethod
from typing import Any, Dict, List
from ..formatters.base_formatter import BaseFormatter
from ..writers.base_writer import BaseWriter
class BaseCollector(ABC):
"""Base class for data collectors."""
def __init__(self, collection_interval: int = 100):
self.collection_interval = collection_interval
self.last_collected_tick = -1
self.data_buffer: List[Dict[str, Any]] = []
@abstractmethod
def collect(self, simulation_core) -> Dict[str, Any]:
"""Collect data from simulation core."""
pass
def should_collect(self, current_tick: int) -> bool:
"""Check if data should be collected on current tick."""
return current_tick - self.last_collected_tick >= self.collection_interval
def update(self, simulation_core) -> List[Dict[str, Any]]:
"""Update collector and return collected data if interval reached."""
current_tick = simulation_core.state.total_ticks
if self.should_collect(current_tick):
data = self.collect(simulation_core)
self.data_buffer.append(data)
self.last_collected_tick = current_tick
return [data]
return []
def get_buffered_data(self) -> List[Dict[str, Any]]:
"""Get all buffered data and clear buffer."""
data = self.data_buffer.copy()
self.data_buffer.clear()
return data
def clear_buffer(self):
"""Clear the data buffer."""
self.data_buffer.clear()