331 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			331 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import numpy as np
 | |
| import random
 | |
| from copy import deepcopy
 | |
| from world.behavioral import BehavioralModel
 | |
| 
 | |
| 
 | |
| class FlexibleNeuralNetwork:
 | |
|     """
 | |
|     A flexible neural network that can mutate its structure and weights.
 | |
|     Supports variable topology with cross-layer connections.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, input_size=2, output_size=2):
 | |
|         self.input_size = input_size
 | |
|         self.output_size = output_size
 | |
| 
 | |
|         # Network structure: list of layers, each layer is a list of neurons
 | |
|         # Each neuron is represented by its connections and bias
 | |
|         self.layers = []
 | |
| 
 | |
|         # Initialize with just input and output layers (no hidden layers)
 | |
|         self._initialize_basic_network()
 | |
| 
 | |
|     def _initialize_basic_network(self):
 | |
|         """Initialize a basic network with input->output connections only."""
 | |
|         # Input layer (no actual neurons, just placeholders)
 | |
|         input_layer = [{'type': 'input', 'id': i} for i in range(self.input_size)]
 | |
| 
 | |
|         # Output layer with connections to all inputs
 | |
|         output_layer = []
 | |
|         for i in range(self.output_size):
 | |
|             neuron = {
 | |
|                 'type': 'output',
 | |
|                 'id': f'out_{i}',
 | |
|                 'bias': random.uniform(-1, 1),
 | |
|                 'connections': []  # List of (source_layer, source_neuron, weight)
 | |
|             }
 | |
| 
 | |
|             # Connect to all input neurons
 | |
|             for j in range(self.input_size):
 | |
|                 neuron['connections'].append((0, j, random.uniform(-2, 2)))
 | |
| 
 | |
|             output_layer.append(neuron)
 | |
| 
 | |
|         self.layers = [input_layer, output_layer]
 | |
| 
 | |
|     def forward(self, inputs):
 | |
|         """
 | |
|         Forward pass through the network.
 | |
| 
 | |
|         :param inputs: List or array of input values
 | |
|         :return: List of output values
 | |
|         """
 | |
|         if len(inputs) != self.input_size:
 | |
|             raise ValueError(f"Expected {self.input_size} inputs, got {len(inputs)}")
 | |
| 
 | |
|         # Store activations for each layer
 | |
|         activations = [inputs]  # Input layer activations
 | |
| 
 | |
|         # Process each subsequent layer
 | |
|         for layer_idx in range(1, len(self.layers)):
 | |
|             layer_activations = []
 | |
| 
 | |
|             for neuron in self.layers[layer_idx]:
 | |
|                 if neuron['type'] == 'input':
 | |
|                     continue  # Skip input neurons in hidden layers
 | |
| 
 | |
|                 # Calculate weighted sum of inputs
 | |
|                 weighted_sum = neuron['bias']
 | |
| 
 | |
|                 for source_layer, source_neuron, weight in neuron['connections']:
 | |
|                     if source_layer < len(activations):
 | |
|                         if source_neuron < len(activations[source_layer]):
 | |
|                             weighted_sum += activations[source_layer][source_neuron] * weight
 | |
| 
 | |
|                 # Apply activation function (tanh for bounded output)
 | |
|                 activation = np.tanh(weighted_sum)
 | |
|                 layer_activations.append(activation)
 | |
| 
 | |
|             activations.append(layer_activations)
 | |
| 
 | |
|         return activations[-1]  # Return output layer activations
 | |
| 
 | |
|     def mutate(self, mutation_rate=0.1):
 | |
|         """
 | |
|         Create a mutated copy of this network.
 | |
| 
 | |
|         :param mutation_rate: Probability of each type of mutation
 | |
|         :return: New mutated FlexibleNeuralNetwork instance
 | |
|         """
 | |
|         mutated = deepcopy(self)
 | |
| 
 | |
|         # Different types of mutations
 | |
|         mutations = [
 | |
|             mutated._mutate_weights,
 | |
|             mutated._mutate_biases,
 | |
|             mutated._add_connection,
 | |
|             mutated._remove_connection,
 | |
|             mutated._add_neuron,
 | |
|             mutated._remove_neuron
 | |
|         ]
 | |
| 
 | |
|         # Apply random mutations
 | |
|         for mutation_func in mutations:
 | |
|             if random.random() < mutation_rate:
 | |
|                 mutation_func()
 | |
| 
 | |
|         return mutated
 | |
| 
 | |
|     def _mutate_weights(self):
 | |
|         """Slightly modify existing connection weights."""
 | |
|         for layer in self.layers[1:]:  # Skip input layer
 | |
|             for neuron in layer:
 | |
|                 if 'connections' in neuron:
 | |
|                     for i in range(len(neuron['connections'])):
 | |
|                         if random.random() < 0.3:  # 30% chance to mutate each weight
 | |
|                             source_layer, source_neuron, weight = neuron['connections'][i]
 | |
|                             # Add small random change
 | |
|                             new_weight = weight + random.uniform(-0.5, 0.5)
 | |
|                             neuron['connections'][i] = (source_layer, source_neuron, new_weight)
 | |
| 
 | |
|     def _mutate_biases(self):
 | |
|         """Slightly modify neuron biases."""
 | |
|         for layer in self.layers[1:]:  # Skip input layer
 | |
|             for neuron in layer:
 | |
|                 if 'bias' in neuron and random.random() < 0.3:
 | |
|                     neuron['bias'] += random.uniform(-0.5, 0.5)
 | |
| 
 | |
|     def _add_connection(self):
 | |
|         """Add a new random connection."""
 | |
|         if len(self.layers) < 2:
 | |
|             return
 | |
| 
 | |
|         # Pick a random target neuron (not in input layer)
 | |
|         target_layer_idx = random.randint(1, len(self.layers) - 1)
 | |
|         target_neuron_idx = random.randint(0, len(self.layers[target_layer_idx]) - 1)
 | |
|         target_neuron = self.layers[target_layer_idx][target_neuron_idx]
 | |
| 
 | |
|         if 'connections' not in target_neuron:
 | |
|             return
 | |
| 
 | |
|         # Pick a random source (from any previous layer)
 | |
|         source_layer_idx = random.randint(0, target_layer_idx - 1)
 | |
|         if len(self.layers[source_layer_idx]) == 0:
 | |
|             return
 | |
| 
 | |
|         source_neuron_idx = random.randint(0, len(self.layers[source_layer_idx]) - 1)
 | |
| 
 | |
|         # Check if connection already exists
 | |
|         for conn in target_neuron['connections']:
 | |
|             if conn[0] == source_layer_idx and conn[1] == source_neuron_idx:
 | |
|                 return  # Connection already exists
 | |
| 
 | |
|         # Add new connection
 | |
|         new_weight = random.uniform(-2, 2)
 | |
|         target_neuron['connections'].append((source_layer_idx, source_neuron_idx, new_weight))
 | |
| 
 | |
|     def _remove_connection(self):
 | |
|         """Remove a random connection."""
 | |
|         for layer in self.layers[1:]:
 | |
|             for neuron in layer:
 | |
|                 if 'connections' in neuron and len(neuron['connections']) > 1:
 | |
|                     if random.random() < 0.1:  # 10% chance to remove a connection
 | |
|                         neuron['connections'].pop(random.randint(0, len(neuron['connections']) - 1))
 | |
| 
 | |
|     def _add_neuron(self):
 | |
|         """Add a new neuron to a random hidden layer or create a new hidden layer."""
 | |
|         if random.random() < 0.05:  # 5% chance to add neuron
 | |
|             if len(self.layers) == 2:  # Only input and output layers
 | |
|                 # Create a new hidden layer
 | |
|                 hidden_neuron = {
 | |
|                     'type': 'hidden',
 | |
|                     'id': f'hidden_{random.randint(1000, 9999)}',
 | |
|                     'bias': random.uniform(-1, 1),
 | |
|                     'connections': []
 | |
|                 }
 | |
| 
 | |
|                 # Connect to some input neurons
 | |
|                 for i in range(self.input_size):
 | |
|                     if random.random() < 0.7:  # 70% chance to connect to each input
 | |
|                         hidden_neuron['connections'].append((0, i, random.uniform(-2, 2)))
 | |
| 
 | |
|                 # Insert hidden layer
 | |
|                 self.layers.insert(1, [hidden_neuron])
 | |
| 
 | |
|                 # Update output layer connections to potentially use new hidden neuron
 | |
|                 for neuron in self.layers[-1]:  # Output layer
 | |
|                     if random.random() < 0.5:  # 50% chance to connect to new hidden neuron
 | |
|                         neuron['connections'].append((1, 0, random.uniform(-2, 2)))
 | |
| 
 | |
|             else:
 | |
|                 # Add neuron to existing hidden layer
 | |
|                 hidden_layer_idx = random.randint(1, len(self.layers) - 2)
 | |
|                 new_neuron = {
 | |
|                     'type': 'hidden',
 | |
|                     'id': f'hidden_{random.randint(1000, 9999)}',
 | |
|                     'bias': random.uniform(-1, 1),
 | |
|                     'connections': []
 | |
|                 }
 | |
| 
 | |
|                 # Connect to some neurons from previous layers
 | |
|                 for layer_idx in range(hidden_layer_idx):
 | |
|                     for neuron_idx in range(len(self.layers[layer_idx])):
 | |
|                         if random.random() < 0.3:  # 30% chance to connect
 | |
|                             new_neuron['connections'].append((layer_idx, neuron_idx, random.uniform(-2, 2)))
 | |
| 
 | |
|                 self.layers[hidden_layer_idx].append(new_neuron)
 | |
| 
 | |
|     def _remove_neuron(self):
 | |
|         """Remove a random neuron from hidden layers."""
 | |
|         if len(self.layers) > 2:  # Has hidden layers
 | |
|             for layer_idx in range(1, len(self.layers) - 1):  # Only hidden layers
 | |
|                 if len(self.layers[layer_idx]) > 0 and random.random() < 0.02:  # 2% chance
 | |
|                     neuron_idx = random.randint(0, len(self.layers[layer_idx]) - 1)
 | |
|                     self.layers[layer_idx].pop(neuron_idx)
 | |
| 
 | |
|                     # Remove connections to this neuron from later layers
 | |
|                     for later_layer_idx in range(layer_idx + 1, len(self.layers)):
 | |
|                         for neuron in self.layers[later_layer_idx]:
 | |
|                             if 'connections' in neuron:
 | |
|                                 neuron['connections'] = [
 | |
|                                     (src_layer, src_neuron, weight)
 | |
|                                     for src_layer, src_neuron, weight in neuron['connections']
 | |
|                                     if not (src_layer == layer_idx and src_neuron == neuron_idx)
 | |
|                                 ]
 | |
|                     break
 | |
| 
 | |
|     def get_structure_info(self):
 | |
|         """Return information about the network structure."""
 | |
|         info = {
 | |
|             'total_layers': len(self.layers),
 | |
|             'layer_sizes': [len(layer) for layer in self.layers],
 | |
|             'total_connections': 0,
 | |
|             'total_neurons': sum(len(layer) for layer in self.layers)
 | |
|         }
 | |
| 
 | |
|         for layer in self.layers[1:]:
 | |
|             for neuron in layer:
 | |
|                 if 'connections' in neuron:
 | |
|                     info['total_connections'] += len(neuron['connections'])
 | |
| 
 | |
|         return info
 | |
| 
 | |
| 
 | |
| class CellBrain(BehavioralModel):
 | |
|     """
 | |
|     Enhanced CellBrain using a flexible neural network with input normalization.
 | |
|     """
 | |
| 
 | |
|     def __init__(self, neural_network=None, input_ranges=None):
 | |
|         super().__init__()
 | |
| 
 | |
|         # Define input and output keys
 | |
|         self.input_keys = ['distance', 'angle']
 | |
|         self.output_keys = ['linear_acceleration', 'angular_acceleration']
 | |
| 
 | |
|         # Initialize inputs and outputs
 | |
|         self.inputs = {key: 0.0 for key in self.input_keys}
 | |
|         self.outputs = {key: 0.0 for key in self.output_keys}
 | |
| 
 | |
|         # Set input ranges for normalization
 | |
|         default_ranges = {
 | |
|             'distance': (0, 50),
 | |
|             'angle': (-180, 180)
 | |
|         }
 | |
|         self.input_ranges = input_ranges if input_ranges is not None else default_ranges
 | |
| 
 | |
|         # Use provided network or create new one
 | |
|         if neural_network is None:
 | |
|             self.neural_network = FlexibleNeuralNetwork(
 | |
|                 input_size=len(self.input_keys),
 | |
|                 output_size=len(self.output_keys)
 | |
|             )
 | |
|         else:
 | |
|             self.neural_network = neural_network
 | |
| 
 | |
|     def _normalize_input(self, key, value):
 | |
|         min_val, max_val = self.input_ranges.get(key, (0.0, 1.0))
 | |
|         # Avoid division by zero
 | |
|         if max_val == min_val:
 | |
|             return 0.0
 | |
|         # Normalize to [-1, 1]
 | |
|         return 2 * (value - min_val) / (max_val - min_val) - 1
 | |
| 
 | |
|     def tick(self, input_data) -> dict:
 | |
|         """
 | |
|         Process inputs through neural network and produce outputs.
 | |
| 
 | |
|         :param input_data: Dictionary containing input values
 | |
|         :return: Dictionary with output values
 | |
|         """
 | |
|         # Update internal input state
 | |
|         for key in self.input_keys:
 | |
|             self.inputs[key] = input_data.get(key, 0.0)
 | |
| 
 | |
|         # Normalize inputs
 | |
|         input_array = [self._normalize_input(key, self.inputs[key]) for key in self.input_keys]
 | |
| 
 | |
|         # Process through neural network
 | |
|         output_array = self.neural_network.forward(input_array)
 | |
| 
 | |
|         # Map outputs back to dictionary
 | |
|         self.outputs = {
 | |
|             key: output_array[i] if i < len(output_array) else 0.0
 | |
|             for i, key in enumerate(self.output_keys)
 | |
|         }
 | |
| 
 | |
|         return self.outputs.copy()
 | |
| 
 | |
|     def mutate(self, mutation_rate=0.1):
 | |
|         """
 | |
|         Create a mutated copy of this CellBrain.
 | |
| 
 | |
|         :param mutation_rate: Rate of mutation for the neural network
 | |
|         :return: New CellBrain with mutated neural network
 | |
|         """
 | |
|         mutated_network = self.neural_network.mutate(mutation_rate)
 | |
|         return CellBrain(neural_network=mutated_network, input_ranges=self.input_ranges.copy())
 | |
| 
 | |
|     def get_network_info(self):
 | |
|         """Get information about the underlying neural network."""
 | |
|         return self.neural_network.get_structure_info()
 | |
| 
 | |
|     def __repr__(self):
 | |
|         inputs = {key: round(value, 5) for key, value in self.inputs.items()}
 | |
|         outputs = {key: round(value, 5) for key, value in self.outputs.items()}
 | |
|         network_info = self.get_network_info()
 | |
| 
 | |
|         return (f"CellBrain(inputs={inputs}, outputs={outputs}, "
 | |
|                 f"network_layers={network_info['layer_sizes']}, "
 | |
|                 f"connections={network_info['total_connections']})") |