Multi-Agent AI Systems
Build sophisticated AI systems where multiple agents interact, collaborate, and compete to create emergent gameplay experiences. This comprehensive tutorial covers multi-agent architectures, communication protocols, swarm intelligence, and complex AI ecosystems.
What You'll Learn
By the end of this tutorial, you'll understand:
- Multi-agent architectures and communication protocols
- Agent coordination and conflict resolution systems
- Emergent behavior and swarm intelligence techniques
- Competitive and cooperative AI systems
- Performance optimization for multiple agents
- Real-world applications in game development
Understanding Multi-Agent Systems
What are Multi-Agent Systems?
Multi-agent systems consist of multiple autonomous agents that interact with each other and their environment:
- Autonomous Agents: Independent entities with their own goals and behaviors
- Interactions: Communication, cooperation, competition, and negotiation
- Emergent Behavior: Complex behaviors that arise from simple agent interactions
- Distributed Intelligence: Collective intelligence greater than individual agents
Key Concepts
1. Agent Autonomy
Each agent operates independently with its own:
- Goals and objectives
- Decision-making processes
- Communication capabilities
- Learning and adaptation
2. Agent Communication
Agents communicate through:
- Message passing and protocols
- Shared knowledge bases
- Environmental signals
- Direct and indirect interactions
3. Emergent Behavior
Complex behaviors that emerge from simple rules:
- Swarm intelligence and flocking
- Collective decision-making
- Self-organizing systems
- Adaptive group behaviors
Step 1: Basic Multi-Agent Architecture
Agent Base Class
import uuid
import time
from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod
class Agent(ABC):
def __init__(self, agent_id=None, name="Agent"):
self.id = agent_id or str(uuid.uuid4())
self.name = name
self.position = (0, 0)
self.state = "idle"
self.goals = []
self.memory = {}
self.communication_queue = []
self.other_agents = {}
self.environment = None
# Agent capabilities
self.speed = 1.0
self.vision_range = 10.0
self.communication_range = 5.0
self.energy = 100.0
self.max_energy = 100.0
# Learning and adaptation
self.learning_rate = 0.1
self.adaptation_threshold = 0.5
self.success_history = []
@abstractmethod
def perceive(self, environment):
"""Perceive the environment and other agents"""
pass
@abstractmethod
def decide(self, perceptions):
"""Make decisions based on perceptions"""
pass
@abstractmethod
def act(self, decision):
"""Execute the decided action"""
pass
def update(self, environment):
"""Main update loop"""
# Perceive environment
perceptions = self.perceive(environment)
# Make decision
decision = self.decide(perceptions)
# Execute action
result = self.act(decision)
# Learn from experience
self.learn_from_experience(decision, result)
return result
def communicate(self, message, target_agent=None):
"""Send message to other agents"""
if target_agent:
# Direct communication
if target_agent in self.other_agents:
self.other_agents[target_agent].receive_message(message, self.id)
else:
# Broadcast communication
for agent_id, agent in self.other_agents.items():
if self._is_in_range(agent):
agent.receive_message(message, self.id)
def receive_message(self, message, sender_id):
"""Receive message from another agent"""
self.communication_queue.append({
"message": message,
"sender": sender_id,
"timestamp": time.time()
})
def _is_in_range(self, other_agent):
"""Check if another agent is in communication range"""
distance = self._calculate_distance(other_agent.position)
return distance <= self.communication_range
def _calculate_distance(self, position):
"""Calculate distance to a position"""
return ((self.position[0] - position[0])**2 +
(self.position[1] - position[1])**2)**0.5
def learn_from_experience(self, decision, result):
"""Learn from experience and adapt behavior"""
success = self._evaluate_success(decision, result)
self.success_history.append(success)
# Keep only recent history
if len(self.success_history) > 100:
self.success_history = self.success_history[-100:]
# Adapt behavior if success rate is low
if len(self.success_history) > 10:
recent_success_rate = sum(self.success_history[-10:]) / 10
if recent_success_rate < self.adaptation_threshold:
self.adapt_behavior()
def _evaluate_success(self, decision, result):
"""Evaluate if the decision was successful"""
# Simple success evaluation (override in subclasses)
return result.get("success", False)
def adapt_behavior(self):
"""Adapt behavior based on learning"""
# Override in subclasses for specific adaptation
pass
Environment for Multi-Agent Systems
class MultiAgentEnvironment:
def __init__(self, width=100, height=100):
self.width = width
self.height = height
self.agents = {}
self.obstacles = []
self.resources = []
self.time_step = 0
self.global_state = {}
# Environment properties
self.gravity = 0.0
self.friction = 0.1
self.temperature = 20.0
self.visibility = 1.0
def add_agent(self, agent):
"""Add agent to environment"""
self.agents[agent.id] = agent
agent.environment = self
agent.other_agents = {aid: a for aid, a in self.agents.items() if aid != agent.id}
def remove_agent(self, agent_id):
"""Remove agent from environment"""
if agent_id in self.agents:
del self.agents[agent_id]
# Update other agents' references
for agent in self.agents.values():
agent.other_agents = {aid: a for aid, a in self.agents.items() if aid != agent.id}
def update(self):
"""Update all agents in environment"""
self.time_step += 1
# Update each agent
for agent in self.agents.values():
agent.update(self)
# Update environment state
self._update_environment_state()
def _update_environment_state(self):
"""Update global environment state"""
self.global_state = {
"time_step": self.time_step,
"agent_count": len(self.agents),
"average_energy": sum(agent.energy for agent in self.agents.values()) / len(self.agents) if self.agents else 0,
"resource_count": len(self.resources)
}
def get_agents_in_range(self, position, range_distance):
"""Get agents within range of a position"""
agents_in_range = []
for agent in self.agents.values():
distance = ((agent.position[0] - position[0])**2 +
(agent.position[1] - position[1])**2)**0.5
if distance <= range_distance:
agents_in_range.append(agent)
return agents_in_range
def add_resource(self, position, resource_type, value):
"""Add resource to environment"""
self.resources.append({
"position": position,
"type": resource_type,
"value": value,
"id": str(uuid.uuid4())
})
def remove_resource(self, resource_id):
"""Remove resource from environment"""
self.resources = [r for r in self.resources if r["id"] != resource_id]
Step 2: Cooperative Multi-Agent Systems
Cooperative Agent Implementation
class CooperativeAgent(Agent):
def __init__(self, agent_id=None, name="CooperativeAgent"):
super().__init__(agent_id, name)
self.cooperation_level = 0.8
self.trust_levels = {} # agent_id -> trust_level
self.shared_goals = []
self.team_members = set()
# Cooperation parameters
self.help_threshold = 0.6
self.sharing_threshold = 0.7
self.leadership_tendency = 0.5
def perceive(self, environment):
"""Perceive environment and other agents"""
perceptions = {
"self": {
"position": self.position,
"energy": self.energy,
"state": self.state
},
"environment": {
"resources": environment.resources,
"obstacles": environment.obstacles,
"global_state": environment.global_state
},
"other_agents": {}
}
# Perceive other agents in range
nearby_agents = environment.get_agents_in_range(self.position, self.vision_range)
for agent in nearby_agents:
if agent.id != self.id:
perceptions["other_agents"][agent.id] = {
"position": agent.position,
"state": agent.state,
"energy": agent.energy,
"distance": self._calculate_distance(agent.position)
}
# Check for messages
perceptions["messages"] = self.communication_queue.copy()
self.communication_queue.clear()
return perceptions
def decide(self, perceptions):
"""Make cooperative decisions"""
decision = {
"action": "idle",
"target": None,
"cooperation": False,
"message": None
}
# Check for help requests
help_requests = self._check_help_requests(perceptions)
if help_requests and self.cooperation_level > self.help_threshold:
decision = self._decide_to_help(help_requests)
# Check for resource sharing opportunities
elif self._should_share_resources(perceptions):
decision = self._decide_to_share(perceptions)
# Check for team coordination
elif self._should_coordinate(perceptions):
decision = self._decide_coordination(perceptions)
# Default behavior
else:
decision = self._default_behavior(perceptions)
return decision
def act(self, decision):
"""Execute cooperative action"""
result = {"success": False, "message": None}
if decision["action"] == "help":
result = self._execute_help(decision)
elif decision["action"] == "share":
result = self._execute_sharing(decision)
elif decision["action"] == "coordinate":
result = self._execute_coordination(decision)
elif decision["action"] == "move":
result = self._execute_movement(decision)
elif decision["action"] == "communicate":
result = self._execute_communication(decision)
return result
def _check_help_requests(self, perceptions):
"""Check for help requests from other agents"""
help_requests = []
for message in perceptions.get("messages", []):
if message["message"].get("type") == "help_request":
help_requests.append(message)
return help_requests
def _should_share_resources(self, perceptions):
"""Check if should share resources with other agents"""
if not perceptions["other_agents"]:
return False
# Check if other agents need resources
for agent_id, agent_info in perceptions["other_agents"].items():
if agent_info["energy"] < 30: # Low energy threshold
return True
return False
def _should_coordinate(self, perceptions):
"""Check if should coordinate with other agents"""
if len(perceptions["other_agents"]) > 1:
return True
return False
def _decide_to_help(self, help_requests):
"""Decide how to help other agents"""
# Choose the most urgent help request
urgent_request = max(help_requests, key=lambda r: r["message"].get("urgency", 0))
return {
"action": "help",
"target": urgent_request["sender"],
"cooperation": True,
"message": {"type": "help_response", "content": "I'll help you!"}
}
def _decide_to_share(self, perceptions):
"""Decide what to share with other agents"""
# Find agent with lowest energy
lowest_energy_agent = min(
perceptions["other_agents"].items(),
key=lambda x: x[1]["energy"]
)
return {
"action": "share",
"target": lowest_energy_agent[0],
"cooperation": True,
"message": {"type": "resource_offer", "content": "I can share some energy with you"}
}
def _decide_coordination(self, perceptions):
"""Decide how to coordinate with other agents"""
# Simple coordination: move towards center of nearby agents
if len(perceptions["other_agents"]) > 0:
center_x = sum(agent["position"][0] for agent in perceptions["other_agents"].values()) / len(perceptions["other_agents"])
center_y = sum(agent["position"][1] for agent in perceptions["other_agents"].values()) / len(perceptions["other_agents"])
return {
"action": "move",
"target": (center_x, center_y),
"cooperation": True,
"message": {"type": "coordination", "content": "Moving to coordinate"}
}
return self._default_behavior(perceptions)
def _default_behavior(self, perceptions):
"""Default behavior when no cooperation is needed"""
# Look for resources
if perceptions["environment"]["resources"]:
nearest_resource = min(
perceptions["environment"]["resources"],
key=lambda r: self._calculate_distance(r["position"])
)
return {
"action": "move",
"target": nearest_resource["position"],
"cooperation": False,
"message": None
}
return {
"action": "idle",
"target": None,
"cooperation": False,
"message": None
}
def _execute_help(self, decision):
"""Execute help action"""
target_agent = self.other_agents.get(decision["target"])
if target_agent:
# Transfer energy to help
energy_to_transfer = min(20, self.energy * 0.2)
self.energy -= energy_to_transfer
target_agent.energy += energy_to_transfer
# Update trust
if decision["target"] not in self.trust_levels:
self.trust_levels[decision["target"]] = 0.5
self.trust_levels[decision["target"]] = min(1.0, self.trust_levels[decision["target"]] + 0.1)
return {"success": True, "message": f"Helped {target_agent.name}"}
return {"success": False, "message": "Target agent not found"}
def _execute_sharing(self, decision):
"""Execute resource sharing"""
target_agent = self.other_agents.get(decision["target"])
if target_agent:
# Share energy
energy_to_share = min(15, self.energy * 0.15)
self.energy -= energy_to_share
target_agent.energy += energy_to_share
return {"success": True, "message": f"Shared resources with {target_agent.name}"}
return {"success": False, "message": "Target agent not found"}
def _execute_coordination(self, decision):
"""Execute coordination action"""
if decision["target"]:
# Move towards target position
self._move_towards(decision["target"])
return {"success": True, "message": "Coordinated movement"}
return {"success": False, "message": "No target for coordination"}
def _execute_movement(self, decision):
"""Execute movement action"""
if decision["target"]:
self._move_towards(decision["target"])
return {"success": True, "message": "Moved towards target"}
return {"success": False, "message": "No movement target"}
def _execute_communication(self, decision):
"""Execute communication action"""
if decision["message"]:
self.communicate(decision["message"], decision.get("target"))
return {"success": True, "message": "Communication sent"}
return {"success": False, "message": "No message to send"}
def _move_towards(self, target_position):
"""Move towards target position"""
dx = target_position[0] - self.position[0]
dy = target_position[1] - self.position[1]
distance = (dx**2 + dy**2)**0.5
if distance > 0:
# Normalize direction
dx /= distance
dy /= distance
# Move with speed limit
move_distance = min(self.speed, distance)
self.position = (
self.position[0] + dx * move_distance,
self.position[1] + dy * move_distance
)
# Consume energy
self.energy = max(0, self.energy - move_distance * 0.1)
Step 3: Competitive Multi-Agent Systems
Competitive Agent Implementation
class CompetitiveAgent(Agent):
def __init__(self, agent_id=None, name="CompetitiveAgent"):
super().__init__(agent_id, name)
self.aggression_level = 0.7
self.competitiveness = 0.8
self.strategy = "aggressive" # aggressive, defensive, opportunistic
self.rivals = set()
self.allies = set()
# Competitive parameters
self.attack_threshold = 0.6
self.defense_threshold = 0.4
self.opportunity_threshold = 0.5
def perceive(self, environment):
"""Perceive environment with competitive focus"""
perceptions = super().perceive(environment)
# Add competitive information
perceptions["threats"] = self._identify_threats(perceptions)
perceptions["opportunities"] = self._identify_opportunities(perceptions)
perceptions["rivals"] = self._identify_rivals(perceptions)
return perceptions
def decide(self, perceptions):
"""Make competitive decisions"""
decision = {
"action": "idle",
"target": None,
"aggressive": False,
"message": None
}
# Check for attack opportunities
if self._should_attack(perceptions):
decision = self._decide_attack(perceptions)
# Check for defense needs
elif self._should_defend(perceptions):
decision = self._decide_defense(perceptions)
# Check for opportunistic actions
elif self._should_exploit_opportunity(perceptions):
decision = self._decide_opportunity(perceptions)
# Default competitive behavior
else:
decision = self._default_competitive_behavior(perceptions)
return decision
def _identify_threats(self, perceptions):
"""Identify potential threats"""
threats = []
for agent_id, agent_info in perceptions["other_agents"].items():
# Consider agents with higher energy as threats
if agent_info["energy"] > self.energy * 1.2:
threats.append({
"agent_id": agent_id,
"threat_level": agent_info["energy"] / self.energy,
"distance": agent_info["distance"]
})
return threats
def _identify_opportunities(self, perceptions):
"""Identify opportunities for competitive advantage"""
opportunities = []
# Look for weak agents
for agent_id, agent_info in perceptions["other_agents"].items():
if agent_info["energy"] < self.energy * 0.8:
opportunities.append({
"type": "weak_agent",
"agent_id": agent_id,
"advantage": self.energy / agent_info["energy"]
})
# Look for resources
for resource in perceptions["environment"]["resources"]:
opportunities.append({
"type": "resource",
"position": resource["position"],
"value": resource["value"]
})
return opportunities
def _identify_rivals(self, perceptions):
"""Identify rival agents"""
rivals = []
for agent_id, agent_info in perceptions["other_agents"].items():
if agent_id in self.rivals:
rivals.append({
"agent_id": agent_id,
"rivalry_level": self._calculate_rivalry_level(agent_id),
"distance": agent_info["distance"]
})
return rivals
def _should_attack(self, perceptions):
"""Check if should attack another agent"""
if not perceptions["threats"] and not perceptions["opportunities"]:
return False
# Attack if we have advantage
for opportunity in perceptions["opportunities"]:
if opportunity["type"] == "weak_agent" and opportunity["advantage"] > 1.5:
return True
return False
def _should_defend(self, perceptions):
"""Check if should defend against threats"""
if not perceptions["threats"]:
return False
# Defend if threatened
for threat in perceptions["threats"]:
if threat["threat_level"] > 1.5 and threat["distance"] < 5:
return True
return False
def _should_exploit_opportunity(self, perceptions):
"""Check if should exploit an opportunity"""
if not perceptions["opportunities"]:
return False
# Exploit if opportunity is good
for opportunity in perceptions["opportunities"]:
if opportunity["type"] == "resource" and opportunity["value"] > 50:
return True
return False
def _decide_attack(self, perceptions):
"""Decide how to attack"""
# Find best target
best_target = None
best_advantage = 0
for opportunity in perceptions["opportunities"]:
if opportunity["type"] == "weak_agent":
if opportunity["advantage"] > best_advantage:
best_advantage = opportunity["advantage"]
best_target = opportunity["agent_id"]
if best_target:
return {
"action": "attack",
"target": best_target,
"aggressive": True,
"message": {"type": "threat", "content": "You're going down!"}
}
return self._default_competitive_behavior(perceptions)
def _decide_defense(self, perceptions):
"""Decide how to defend"""
# Find nearest threat
nearest_threat = min(perceptions["threats"], key=lambda t: t["distance"])
# Move away from threat
threat_agent = self.other_agents.get(nearest_threat["agent_id"])
if threat_agent:
# Calculate escape direction
dx = self.position[0] - threat_agent.position[0]
dy = self.position[1] - threat_agent.position[1]
distance = (dx**2 + dy**2)**0.5
if distance > 0:
escape_x = self.position[0] + (dx / distance) * 10
escape_y = self.position[1] + (dy / distance) * 10
return {
"action": "move",
"target": (escape_x, escape_y),
"aggressive": False,
"message": {"type": "defense", "content": "Retreating!"}
}
return self._default_competitive_behavior(perceptions)
def _decide_opportunity(self, perceptions):
"""Decide how to exploit opportunity"""
# Find best opportunity
best_opportunity = max(perceptions["opportunities"], key=lambda o: o.get("value", 0))
if best_opportunity["type"] == "resource":
return {
"action": "move",
"target": best_opportunity["position"],
"aggressive": False,
"message": {"type": "opportunity", "content": "Going for the resource!"}
}
return self._default_competitive_behavior(perceptions)
def _default_competitive_behavior(self, perceptions):
"""Default competitive behavior"""
# Look for resources or weak agents
if perceptions["opportunities"]:
best_opportunity = max(perceptions["opportunities"], key=lambda o: o.get("value", 0))
if best_opportunity["type"] == "resource":
return {
"action": "move",
"target": best_opportunity["position"],
"aggressive": False,
"message": None
}
return {
"action": "idle",
"target": None,
"aggressive": False,
"message": None
}
def _calculate_rivalry_level(self, agent_id):
"""Calculate rivalry level with another agent"""
# Simple rivalry calculation
if agent_id in self.rivals:
return 0.8
return 0.2
Step 4: Swarm Intelligence Systems
Swarm Agent Implementation
class SwarmAgent(Agent):
def __init__(self, agent_id=None, name="SwarmAgent"):
super().__init__(agent_id, name)
self.swarm_id = None
self.swarm_center = (0, 0)
self.swarm_velocity = (0, 0)
self.swarm_size = 0
# Swarm behavior parameters
self.separation_weight = 1.0
self.alignment_weight = 1.0
self.cohesion_weight = 1.0
self.avoidance_weight = 2.0
# Swarm communication
self.swarm_messages = []
self.consensus_threshold = 0.7
def perceive(self, environment):
"""Perceive environment with swarm focus"""
perceptions = super().perceive(environment)
# Add swarm-specific information
perceptions["swarm_members"] = self._identify_swarm_members(perceptions)
perceptions["swarm_center"] = self._calculate_swarm_center(perceptions)
perceptions["swarm_velocity"] = self._calculate_swarm_velocity(perceptions)
perceptions["swarm_messages"] = self.swarm_messages.copy()
self.swarm_messages.clear()
return perceptions
def decide(self, perceptions):
"""Make swarm-based decisions"""
decision = {
"action": "swarm_move",
"target": None,
"swarm_behavior": True,
"message": None
}
# Apply swarm rules
if perceptions["swarm_members"]:
# Calculate swarm forces
separation_force = self._calculate_separation_force(perceptions)
alignment_force = self._calculate_alignment_force(perceptions)
cohesion_force = self._calculate_cohesion_force(perceptions)
avoidance_force = self._calculate_avoidance_force(perceptions)
# Combine forces
total_force = self._combine_forces(
separation_force, alignment_force,
cohesion_force, avoidance_force
)
# Calculate new position
new_position = (
self.position[0] + total_force[0],
self.position[1] + total_force[1]
)
decision["target"] = new_position
decision["swarm_behavior"] = True
# Check for swarm consensus
if self._should_reach_consensus(perceptions):
decision["message"] = self._create_consensus_message(perceptions)
return decision
def _identify_swarm_members(self, perceptions):
"""Identify other agents in the swarm"""
swarm_members = []
for agent_id, agent_info in perceptions["other_agents"].items():
if agent_info["distance"] <= self.communication_range:
swarm_members.append({
"agent_id": agent_id,
"position": agent_info["position"],
"velocity": agent_info.get("velocity", (0, 0)),
"distance": agent_info["distance"]
})
return swarm_members
def _calculate_swarm_center(self, perceptions):
"""Calculate center of swarm"""
if not perceptions["swarm_members"]:
return self.position
center_x = sum(member["position"][0] for member in perceptions["swarm_members"]) / len(perceptions["swarm_members"])
center_y = sum(member["position"][1] for member in perceptions["swarm_members"]) / len(perceptions["swarm_members"])
return (center_x, center_y)
def _calculate_swarm_velocity(self, perceptions):
"""Calculate average velocity of swarm"""
if not perceptions["swarm_members"]:
return (0, 0)
avg_vx = sum(member["velocity"][0] for member in perceptions["swarm_members"]) / len(perceptions["swarm_members"])
avg_vy = sum(member["velocity"][1] for member in perceptions["swarm_members"]) / len(perceptions["swarm_members"])
return (avg_vx, avg_vy)
def _calculate_separation_force(self, perceptions):
"""Calculate separation force to avoid crowding"""
separation_force = (0, 0)
for member in perceptions["swarm_members"]:
distance = member["distance"]
if distance > 0 and distance < 2: # Too close
# Calculate repulsion force
dx = self.position[0] - member["position"][0]
dy = self.position[1] - member["position"][1]
# Normalize and scale
force_magnitude = self.separation_weight / (distance * distance)
separation_force = (
separation_force[0] + (dx / distance) * force_magnitude,
separation_force[1] + (dy / distance) * force_magnitude
)
return separation_force
def _calculate_alignment_force(self, perceptions):
"""Calculate alignment force to match swarm velocity"""
if not perceptions["swarm_members"]:
return (0, 0)
swarm_velocity = perceptions["swarm_velocity"]
current_velocity = (0, 0) # Simplified - would track actual velocity
alignment_force = (
(swarm_velocity[0] - current_velocity[0]) * self.alignment_weight,
(swarm_velocity[1] - current_velocity[1]) * self.alignment_weight
)
return alignment_force
def _calculate_cohesion_force(self, perceptions):
"""Calculate cohesion force to move towards swarm center"""
swarm_center = perceptions["swarm_center"]
dx = swarm_center[0] - self.position[0]
dy = swarm_center[1] - self.position[1]
distance = (dx**2 + dy**2)**0.5
if distance > 0:
cohesion_force = (
(dx / distance) * self.cohesion_weight,
(dy / distance) * self.cohesion_weight
)
else:
cohesion_force = (0, 0)
return cohesion_force
def _calculate_avoidance_force(self, perceptions):
"""Calculate avoidance force to avoid obstacles"""
avoidance_force = (0, 0)
# Avoid obstacles
for obstacle in perceptions["environment"]["obstacles"]:
distance = self._calculate_distance(obstacle["position"])
if distance < 5: # Too close to obstacle
dx = self.position[0] - obstacle["position"][0]
dy = self.position[1] - obstacle["position"][1]
if distance > 0:
force_magnitude = self.avoidance_weight / distance
avoidance_force = (
avoidance_force[0] + (dx / distance) * force_magnitude,
avoidance_force[1] + (dy / distance) * force_magnitude
)
return avoidance_force
def _combine_forces(self, separation, alignment, cohesion, avoidance):
"""Combine all forces into final movement"""
total_force = (
separation[0] + alignment[0] + cohesion[0] + avoidance[0],
separation[1] + alignment[1] + cohesion[1] + avoidance[1]
)
# Limit force magnitude
magnitude = (total_force[0]**2 + total_force[1]**2)**0.5
if magnitude > self.speed:
total_force = (
(total_force[0] / magnitude) * self.speed,
(total_force[1] / magnitude) * self.speed
)
return total_force
def _should_reach_consensus(self, perceptions):
"""Check if should reach consensus with swarm"""
if len(perceptions["swarm_members"]) > 3:
return True
return False
def _create_consensus_message(self, perceptions):
"""Create consensus message for swarm"""
return {
"type": "swarm_consensus",
"content": "Let's move together",
"swarm_center": perceptions["swarm_center"],
"swarm_velocity": perceptions["swarm_velocity"]
}
Step 5: Multi-Agent System Testing
Test Suite for Multi-Agent Systems
def test_multi_agent_system():
"""Test multi-agent system functionality"""
print("Testing Multi-Agent AI System")
print("=" * 50)
# Create environment
environment = MultiAgentEnvironment(100, 100)
# Create cooperative agents
cooperative_agents = []
for i in range(5):
agent = CooperativeAgent(name=f"CooperativeAgent_{i}")
agent.position = (i * 10, i * 10)
environment.add_agent(agent)
cooperative_agents.append(agent)
# Create competitive agents
competitive_agents = []
for i in range(3):
agent = CompetitiveAgent(name=f"CompetitiveAgent_{i}")
agent.position = (50 + i * 15, 50 + i * 15)
environment.add_agent(agent)
competitive_agents.append(agent)
# Create swarm agents
swarm_agents = []
for i in range(8):
agent = SwarmAgent(name=f"SwarmAgent_{i}")
agent.position = (20 + i * 5, 20 + i * 5)
environment.add_agent(agent)
swarm_agents.append(agent)
# Add resources
for i in range(10):
environment.add_resource(
(i * 10, i * 10),
"energy",
50
)
# Test system for multiple time steps
print("Running multi-agent simulation...")
for time_step in range(100):
environment.update()
# Check for emergent behaviors
if time_step % 20 == 0:
print(f"Time step {time_step}:")
print(f" Agents: {len(environment.agents)}")
print(f" Resources: {len(environment.resources)}")
print(f" Average energy: {environment.global_state['average_energy']:.2f}")
# Test cooperation
print("\nTesting cooperation...")
agent1 = cooperative_agents[0]
agent2 = cooperative_agents[1]
# Simulate help request
agent2.energy = 20 # Low energy
agent1.communicate({"type": "help_request", "urgency": 0.8}, agent2.id)
# Test cooperation response
perceptions = agent1.perceive(environment)
decision = agent1.decide(perceptions)
assert decision["cooperation"] == True
print("ā Cooperation working")
# Test competition
print("Testing competition...")
comp_agent1 = competitive_agents[0]
comp_agent2 = competitive_agents[1]
# Simulate competitive scenario
comp_agent2.energy = 30 # Lower energy
perceptions = comp_agent1.perceive(environment)
decision = comp_agent1.decide(perceptions)
# Should show competitive behavior
assert decision["aggressive"] or decision["action"] != "idle"
print("ā Competition working")
# Test swarm behavior
print("Testing swarm behavior...")
swarm_agent = swarm_agents[0]
perceptions = swarm_agent.perceive(environment)
decision = swarm_agent.decide(perceptions)
# Should show swarm behavior
assert decision["swarm_behavior"] == True
print("ā Swarm behavior working")
# Test performance
print("Testing performance...")
start_time = time.time()
for i in range(50):
environment.update()
end_time = time.time()
avg_time = (end_time - start_time) / 50
assert avg_time < 0.1 # Should update within 0.1 seconds
print(f"ā Performance test passed (avg: {avg_time:.3f}s)")
print("\nš All multi-agent tests passed!")
if __name__ == "__main__":
test_multi_agent_system()
Best Practices for Multi-Agent Systems
1. System Design
- Design clear agent roles and responsibilities
- Implement efficient communication protocols
- Balance cooperation and competition for interesting dynamics
- Plan for scalability from the beginning
2. Performance Optimization
- Use spatial partitioning for large numbers of agents
- Implement efficient collision detection and range queries
- Cache frequently used calculations and perceptions
- Monitor performance metrics and optimize bottlenecks
3. Emergent Behavior
- Design simple rules that lead to complex behaviors
- Test for emergent properties and unexpected interactions
- Document emergent behaviors for future reference
- Balance predictability and surprise in agent behavior
4. Testing and Validation
- Test individual agent behaviors before system integration
- Validate emergent behaviors through extensive simulation
- Monitor system performance under various conditions
- Implement debugging tools for complex interactions
Next Steps
Congratulations! You've learned how to build multi-agent AI systems. Here's what to do next:
1. Practice with Advanced Features
- Implement more sophisticated communication protocols
- Build complex emergent behaviors
- Create hybrid systems with both cooperation and competition
- Experiment with different swarm algorithms
2. Explore Procedural Content Generation
- Learn about AI-powered content generation
- Implement procedural quest and story systems
- Create dynamic world generation
- Build adaptive content systems
3. Continue Learning
- Move to the next tutorial: Procedural Content Generation with AI
- Learn about AI-powered quest systems
- Study performance optimization techniques
- Explore advanced testing methods
4. Build Your Projects
- Create multi-agent game systems
- Implement swarm intelligence
- Build cooperative and competitive AI
- Share your work with the community
Resources and Further Reading
Documentation
Community
Tools
Conclusion
You've learned how to build sophisticated multi-agent AI systems. You now understand:
- How to design and implement multi-agent architectures
- How to create cooperative and competitive AI systems
- How to implement swarm intelligence and emergent behaviors
- How to optimize performance for multiple agents
- How to test and validate complex multi-agent systems
Your AI systems can now exhibit complex emergent behaviors through the interactions of multiple autonomous agents. This foundation will serve you well as you continue to explore advanced AI game development techniques.
Ready for the next step? Continue with Procedural Content Generation with AI to learn how to use AI to generate game content dynamically.
This tutorial is part of the GamineAI Intermediate Tutorial Series. Learn advanced AI techniques, build sophisticated systems, and create professional-grade AI-powered games.