331 lines
13 KiB
Python

import numpy as np
import random
from copy import deepcopy
from world.behavioral import BehavioralModel
class FlexibleNeuralNetwork:
"""
A flexible neural network that can mutate its structure and weights.
Supports variable topology with cross-layer connections.
"""
def __init__(self, input_size=2, output_size=2):
self.input_size = input_size
self.output_size = output_size
# Network structure: list of layers, each layer is a list of neurons
# Each neuron is represented by its connections and bias
self.layers = []
# Initialize with just input and output layers (no hidden layers)
self._initialize_basic_network()
def _initialize_basic_network(self):
"""Initialize a basic network with input->output connections only."""
# Input layer (no actual neurons, just placeholders)
input_layer = [{'type': 'input', 'id': i} for i in range(self.input_size)]
# Output layer with connections to all inputs
output_layer = []
for i in range(self.output_size):
neuron = {
'type': 'output',
'id': f'out_{i}',
'bias': random.uniform(-1, 1),
'connections': [] # List of (source_layer, source_neuron, weight)
}
# Connect to all input neurons
for j in range(self.input_size):
neuron['connections'].append((0, j, random.uniform(-2, 2)))
output_layer.append(neuron)
self.layers = [input_layer, output_layer]
def forward(self, inputs):
"""
Forward pass through the network.
:param inputs: List or array of input values
:return: List of output values
"""
if len(inputs) != self.input_size:
raise ValueError(f"Expected {self.input_size} inputs, got {len(inputs)}")
# Store activations for each layer
activations = [inputs] # Input layer activations
# Process each subsequent layer
for layer_idx in range(1, len(self.layers)):
layer_activations = []
for neuron in self.layers[layer_idx]:
if neuron['type'] == 'input':
continue # Skip input neurons in hidden layers
# Calculate weighted sum of inputs
weighted_sum = neuron['bias']
for source_layer, source_neuron, weight in neuron['connections']:
if source_layer < len(activations):
if source_neuron < len(activations[source_layer]):
weighted_sum += activations[source_layer][source_neuron] * weight
# Apply activation function (tanh for bounded output)
activation = np.tanh(weighted_sum)
layer_activations.append(activation)
activations.append(layer_activations)
return activations[-1] # Return output layer activations
def mutate(self, mutation_rate=0.1):
"""
Create a mutated copy of this network.
:param mutation_rate: Probability of each type of mutation
:return: New mutated FlexibleNeuralNetwork instance
"""
mutated = deepcopy(self)
# Different types of mutations
mutations = [
mutated._mutate_weights,
mutated._mutate_biases,
mutated._add_connection,
mutated._remove_connection,
mutated._add_neuron,
mutated._remove_neuron
]
# Apply random mutations
for mutation_func in mutations:
if random.random() < mutation_rate:
mutation_func()
return mutated
def _mutate_weights(self):
"""Slightly modify existing connection weights."""
for layer in self.layers[1:]: # Skip input layer
for neuron in layer:
if 'connections' in neuron:
for i in range(len(neuron['connections'])):
if random.random() < 0.3: # 30% chance to mutate each weight
source_layer, source_neuron, weight = neuron['connections'][i]
# Add small random change
new_weight = weight + random.uniform(-0.5, 0.5)
neuron['connections'][i] = (source_layer, source_neuron, new_weight)
def _mutate_biases(self):
"""Slightly modify neuron biases."""
for layer in self.layers[1:]: # Skip input layer
for neuron in layer:
if 'bias' in neuron and random.random() < 0.3:
neuron['bias'] += random.uniform(-0.5, 0.5)
def _add_connection(self):
"""Add a new random connection."""
if len(self.layers) < 2:
return
# Pick a random target neuron (not in input layer)
target_layer_idx = random.randint(1, len(self.layers) - 1)
target_neuron_idx = random.randint(0, len(self.layers[target_layer_idx]) - 1)
target_neuron = self.layers[target_layer_idx][target_neuron_idx]
if 'connections' not in target_neuron:
return
# Pick a random source (from any previous layer)
source_layer_idx = random.randint(0, target_layer_idx - 1)
if len(self.layers[source_layer_idx]) == 0:
return
source_neuron_idx = random.randint(0, len(self.layers[source_layer_idx]) - 1)
# Check if connection already exists
for conn in target_neuron['connections']:
if conn[0] == source_layer_idx and conn[1] == source_neuron_idx:
return # Connection already exists
# Add new connection
new_weight = random.uniform(-2, 2)
target_neuron['connections'].append((source_layer_idx, source_neuron_idx, new_weight))
def _remove_connection(self):
"""Remove a random connection."""
for layer in self.layers[1:]:
for neuron in layer:
if 'connections' in neuron and len(neuron['connections']) > 1:
if random.random() < 0.1: # 10% chance to remove a connection
neuron['connections'].pop(random.randint(0, len(neuron['connections']) - 1))
def _add_neuron(self):
"""Add a new neuron to a random hidden layer or create a new hidden layer."""
if random.random() < 0.05: # 5% chance to add neuron
if len(self.layers) == 2: # Only input and output layers
# Create a new hidden layer
hidden_neuron = {
'type': 'hidden',
'id': f'hidden_{random.randint(1000, 9999)}',
'bias': random.uniform(-1, 1),
'connections': []
}
# Connect to some input neurons
for i in range(self.input_size):
if random.random() < 0.7: # 70% chance to connect to each input
hidden_neuron['connections'].append((0, i, random.uniform(-2, 2)))
# Insert hidden layer
self.layers.insert(1, [hidden_neuron])
# Update output layer connections to potentially use new hidden neuron
for neuron in self.layers[-1]: # Output layer
if random.random() < 0.5: # 50% chance to connect to new hidden neuron
neuron['connections'].append((1, 0, random.uniform(-2, 2)))
else:
# Add neuron to existing hidden layer
hidden_layer_idx = random.randint(1, len(self.layers) - 2)
new_neuron = {
'type': 'hidden',
'id': f'hidden_{random.randint(1000, 9999)}',
'bias': random.uniform(-1, 1),
'connections': []
}
# Connect to some neurons from previous layers
for layer_idx in range(hidden_layer_idx):
for neuron_idx in range(len(self.layers[layer_idx])):
if random.random() < 0.3: # 30% chance to connect
new_neuron['connections'].append((layer_idx, neuron_idx, random.uniform(-2, 2)))
self.layers[hidden_layer_idx].append(new_neuron)
def _remove_neuron(self):
"""Remove a random neuron from hidden layers."""
if len(self.layers) > 2: # Has hidden layers
for layer_idx in range(1, len(self.layers) - 1): # Only hidden layers
if len(self.layers[layer_idx]) > 0 and random.random() < 0.02: # 2% chance
neuron_idx = random.randint(0, len(self.layers[layer_idx]) - 1)
self.layers[layer_idx].pop(neuron_idx)
# Remove connections to this neuron from later layers
for later_layer_idx in range(layer_idx + 1, len(self.layers)):
for neuron in self.layers[later_layer_idx]:
if 'connections' in neuron:
neuron['connections'] = [
(src_layer, src_neuron, weight)
for src_layer, src_neuron, weight in neuron['connections']
if not (src_layer == layer_idx and src_neuron == neuron_idx)
]
break
def get_structure_info(self):
"""Return information about the network structure."""
info = {
'total_layers': len(self.layers),
'layer_sizes': [len(layer) for layer in self.layers],
'total_connections': 0,
'total_neurons': sum(len(layer) for layer in self.layers)
}
for layer in self.layers[1:]:
for neuron in layer:
if 'connections' in neuron:
info['total_connections'] += len(neuron['connections'])
return info
class CellBrain(BehavioralModel):
"""
Enhanced CellBrain using a flexible neural network with input normalization.
"""
def __init__(self, neural_network=None, input_ranges=None):
super().__init__()
# Define input and output keys
self.input_keys = ['distance', 'angle']
self.output_keys = ['linear_acceleration', 'angular_acceleration']
# Initialize inputs and outputs
self.inputs = {key: 0.0 for key in self.input_keys}
self.outputs = {key: 0.0 for key in self.output_keys}
# Set input ranges for normalization
default_ranges = {
'distance': (0, 50),
'angle': (-180, 180)
}
self.input_ranges = input_ranges if input_ranges is not None else default_ranges
# Use provided network or create new one
if neural_network is None:
self.neural_network = FlexibleNeuralNetwork(
input_size=len(self.input_keys),
output_size=len(self.output_keys)
)
else:
self.neural_network = neural_network
def _normalize_input(self, key, value):
min_val, max_val = self.input_ranges.get(key, (0.0, 1.0))
# Avoid division by zero
if max_val == min_val:
return 0.0
# Normalize to [-1, 1]
return 2 * (value - min_val) / (max_val - min_val) - 1
def tick(self, input_data) -> dict:
"""
Process inputs through neural network and produce outputs.
:param input_data: Dictionary containing input values
:return: Dictionary with output values
"""
# Update internal input state
for key in self.input_keys:
self.inputs[key] = input_data.get(key, 0.0)
# Normalize inputs
input_array = [self._normalize_input(key, self.inputs[key]) for key in self.input_keys]
# Process through neural network
output_array = self.neural_network.forward(input_array)
# Map outputs back to dictionary
self.outputs = {
key: output_array[i] if i < len(output_array) else 0.0
for i, key in enumerate(self.output_keys)
}
return self.outputs.copy()
def mutate(self, mutation_rate=0.1):
"""
Create a mutated copy of this CellBrain.
:param mutation_rate: Rate of mutation for the neural network
:return: New CellBrain with mutated neural network
"""
mutated_network = self.neural_network.mutate(mutation_rate)
return CellBrain(neural_network=mutated_network, input_ranges=self.input_ranges.copy())
def get_network_info(self):
"""Get information about the underlying neural network."""
return self.neural_network.get_structure_info()
def __repr__(self):
inputs = {key: round(value, 5) for key, value in self.inputs.items()}
outputs = {key: round(value, 5) for key, value in self.outputs.items()}
network_info = self.get_network_info()
return (f"CellBrain(inputs={inputs}, outputs={outputs}, "
f"network_layers={network_info['layer_sizes']}, "
f"connections={network_info['total_connections']})")