Multi-Agent AI Systems - Collaborative and Competitive Intelligence

Master multi-agent AI systems where multiple agents interact, collaborate, and compete. Learn to build emergent behaviors, swarm intelligence, and complex AI ecosystems.

Learning Feb 27, 2025 90 min read

Multi-Agent AI Systems - Collaborative and Competitive Intelligence

Master multi-agent AI systems where multiple agents interact, collaborate, and compete. Learn to build emergent behaviors, swarm intelligence, and complex AI ecosystems.

By GamineAI Team

Multi-Agent AI Systems

Build sophisticated AI systems where multiple agents interact, collaborate, and compete to create emergent gameplay experiences. This comprehensive tutorial covers multi-agent architectures, communication protocols, swarm intelligence, and complex AI ecosystems.

What You'll Learn

By the end of this tutorial, you'll understand:

  • Multi-agent architectures and communication protocols
  • Agent coordination and conflict resolution systems
  • Emergent behavior and swarm intelligence techniques
  • Competitive and cooperative AI systems
  • Performance optimization for multiple agents
  • Real-world applications in game development

Understanding Multi-Agent Systems

What are Multi-Agent Systems?

Multi-agent systems consist of multiple autonomous agents that interact with each other and their environment:

  • Autonomous Agents: Independent entities with their own goals and behaviors
  • Interactions: Communication, cooperation, competition, and negotiation
  • Emergent Behavior: Complex behaviors that arise from simple agent interactions
  • Distributed Intelligence: Collective intelligence greater than individual agents

Key Concepts

1. Agent Autonomy

Each agent operates independently with its own:

  • Goals and objectives
  • Decision-making processes
  • Communication capabilities
  • Learning and adaptation

2. Agent Communication

Agents communicate through:

  • Message passing and protocols
  • Shared knowledge bases
  • Environmental signals
  • Direct and indirect interactions

3. Emergent Behavior

Complex behaviors that emerge from simple rules:

  • Swarm intelligence and flocking
  • Collective decision-making
  • Self-organizing systems
  • Adaptive group behaviors

Step 1: Basic Multi-Agent Architecture

Agent Base Class

import uuid
import time
from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod

class Agent(ABC):
    def __init__(self, agent_id=None, name="Agent"):
        self.id = agent_id or str(uuid.uuid4())
        self.name = name
        self.position = (0, 0)
        self.state = "idle"
        self.goals = []
        self.memory = {}
        self.communication_queue = []
        self.other_agents = {}
        self.environment = None

        # Agent capabilities
        self.speed = 1.0
        self.vision_range = 10.0
        self.communication_range = 5.0
        self.energy = 100.0
        self.max_energy = 100.0

        # Learning and adaptation
        self.learning_rate = 0.1
        self.adaptation_threshold = 0.5
        self.success_history = []

    @abstractmethod
    def perceive(self, environment):
        """Perceive the environment and other agents"""
        pass

    @abstractmethod
    def decide(self, perceptions):
        """Make decisions based on perceptions"""
        pass

    @abstractmethod
    def act(self, decision):
        """Execute the decided action"""
        pass

    def update(self, environment):
        """Main update loop"""
        # Perceive environment
        perceptions = self.perceive(environment)

        # Make decision
        decision = self.decide(perceptions)

        # Execute action
        result = self.act(decision)

        # Learn from experience
        self.learn_from_experience(decision, result)

        return result

    def communicate(self, message, target_agent=None):
        """Send message to other agents"""
        if target_agent:
            # Direct communication
            if target_agent in self.other_agents:
                self.other_agents[target_agent].receive_message(message, self.id)
        else:
            # Broadcast communication
            for agent_id, agent in self.other_agents.items():
                if self._is_in_range(agent):
                    agent.receive_message(message, self.id)

    def receive_message(self, message, sender_id):
        """Receive message from another agent"""
        self.communication_queue.append({
            "message": message,
            "sender": sender_id,
            "timestamp": time.time()
        })

    def _is_in_range(self, other_agent):
        """Check if another agent is in communication range"""
        distance = self._calculate_distance(other_agent.position)
        return distance <= self.communication_range

    def _calculate_distance(self, position):
        """Calculate distance to a position"""
        return ((self.position[0] - position[0])**2 + 
                (self.position[1] - position[1])**2)**0.5

    def learn_from_experience(self, decision, result):
        """Learn from experience and adapt behavior"""
        success = self._evaluate_success(decision, result)
        self.success_history.append(success)

        # Keep only recent history
        if len(self.success_history) > 100:
            self.success_history = self.success_history[-100:]

        # Adapt behavior if success rate is low
        if len(self.success_history) > 10:
            recent_success_rate = sum(self.success_history[-10:]) / 10
            if recent_success_rate < self.adaptation_threshold:
                self.adapt_behavior()

    def _evaluate_success(self, decision, result):
        """Evaluate if the decision was successful"""
        # Simple success evaluation (override in subclasses)
        return result.get("success", False)

    def adapt_behavior(self):
        """Adapt behavior based on learning"""
        # Override in subclasses for specific adaptation
        pass

Environment for Multi-Agent Systems

class MultiAgentEnvironment:
    def __init__(self, width=100, height=100):
        self.width = width
        self.height = height
        self.agents = {}
        self.obstacles = []
        self.resources = []
        self.time_step = 0
        self.global_state = {}

        # Environment properties
        self.gravity = 0.0
        self.friction = 0.1
        self.temperature = 20.0
        self.visibility = 1.0

    def add_agent(self, agent):
        """Add agent to environment"""
        self.agents[agent.id] = agent
        agent.environment = self
        agent.other_agents = {aid: a for aid, a in self.agents.items() if aid != agent.id}

    def remove_agent(self, agent_id):
        """Remove agent from environment"""
        if agent_id in self.agents:
            del self.agents[agent_id]
            # Update other agents' references
            for agent in self.agents.values():
                agent.other_agents = {aid: a for aid, a in self.agents.items() if aid != agent.id}

    def update(self):
        """Update all agents in environment"""
        self.time_step += 1

        # Update each agent
        for agent in self.agents.values():
            agent.update(self)

        # Update environment state
        self._update_environment_state()

    def _update_environment_state(self):
        """Update global environment state"""
        self.global_state = {
            "time_step": self.time_step,
            "agent_count": len(self.agents),
            "average_energy": sum(agent.energy for agent in self.agents.values()) / len(self.agents) if self.agents else 0,
            "resource_count": len(self.resources)
        }

    def get_agents_in_range(self, position, range_distance):
        """Get agents within range of a position"""
        agents_in_range = []
        for agent in self.agents.values():
            distance = ((agent.position[0] - position[0])**2 + 
                       (agent.position[1] - position[1])**2)**0.5
            if distance <= range_distance:
                agents_in_range.append(agent)
        return agents_in_range

    def add_resource(self, position, resource_type, value):
        """Add resource to environment"""
        self.resources.append({
            "position": position,
            "type": resource_type,
            "value": value,
            "id": str(uuid.uuid4())
        })

    def remove_resource(self, resource_id):
        """Remove resource from environment"""
        self.resources = [r for r in self.resources if r["id"] != resource_id]

Step 2: Cooperative Multi-Agent Systems

Cooperative Agent Implementation

class CooperativeAgent(Agent):
    def __init__(self, agent_id=None, name="CooperativeAgent"):
        super().__init__(agent_id, name)
        self.cooperation_level = 0.8
        self.trust_levels = {}  # agent_id -> trust_level
        self.shared_goals = []
        self.team_members = set()

        # Cooperation parameters
        self.help_threshold = 0.6
        self.sharing_threshold = 0.7
        self.leadership_tendency = 0.5

    def perceive(self, environment):
        """Perceive environment and other agents"""
        perceptions = {
            "self": {
                "position": self.position,
                "energy": self.energy,
                "state": self.state
            },
            "environment": {
                "resources": environment.resources,
                "obstacles": environment.obstacles,
                "global_state": environment.global_state
            },
            "other_agents": {}
        }

        # Perceive other agents in range
        nearby_agents = environment.get_agents_in_range(self.position, self.vision_range)
        for agent in nearby_agents:
            if agent.id != self.id:
                perceptions["other_agents"][agent.id] = {
                    "position": agent.position,
                    "state": agent.state,
                    "energy": agent.energy,
                    "distance": self._calculate_distance(agent.position)
                }

        # Check for messages
        perceptions["messages"] = self.communication_queue.copy()
        self.communication_queue.clear()

        return perceptions

    def decide(self, perceptions):
        """Make cooperative decisions"""
        decision = {
            "action": "idle",
            "target": None,
            "cooperation": False,
            "message": None
        }

        # Check for help requests
        help_requests = self._check_help_requests(perceptions)
        if help_requests and self.cooperation_level > self.help_threshold:
            decision = self._decide_to_help(help_requests)

        # Check for resource sharing opportunities
        elif self._should_share_resources(perceptions):
            decision = self._decide_to_share(perceptions)

        # Check for team coordination
        elif self._should_coordinate(perceptions):
            decision = self._decide_coordination(perceptions)

        # Default behavior
        else:
            decision = self._default_behavior(perceptions)

        return decision

    def act(self, decision):
        """Execute cooperative action"""
        result = {"success": False, "message": None}

        if decision["action"] == "help":
            result = self._execute_help(decision)
        elif decision["action"] == "share":
            result = self._execute_sharing(decision)
        elif decision["action"] == "coordinate":
            result = self._execute_coordination(decision)
        elif decision["action"] == "move":
            result = self._execute_movement(decision)
        elif decision["action"] == "communicate":
            result = self._execute_communication(decision)

        return result

    def _check_help_requests(self, perceptions):
        """Check for help requests from other agents"""
        help_requests = []
        for message in perceptions.get("messages", []):
            if message["message"].get("type") == "help_request":
                help_requests.append(message)
        return help_requests

    def _should_share_resources(self, perceptions):
        """Check if should share resources with other agents"""
        if not perceptions["other_agents"]:
            return False

        # Check if other agents need resources
        for agent_id, agent_info in perceptions["other_agents"].items():
            if agent_info["energy"] < 30:  # Low energy threshold
                return True

        return False

    def _should_coordinate(self, perceptions):
        """Check if should coordinate with other agents"""
        if len(perceptions["other_agents"]) > 1:
            return True
        return False

    def _decide_to_help(self, help_requests):
        """Decide how to help other agents"""
        # Choose the most urgent help request
        urgent_request = max(help_requests, key=lambda r: r["message"].get("urgency", 0))

        return {
            "action": "help",
            "target": urgent_request["sender"],
            "cooperation": True,
            "message": {"type": "help_response", "content": "I'll help you!"}
        }

    def _decide_to_share(self, perceptions):
        """Decide what to share with other agents"""
        # Find agent with lowest energy
        lowest_energy_agent = min(
            perceptions["other_agents"].items(),
            key=lambda x: x[1]["energy"]
        )

        return {
            "action": "share",
            "target": lowest_energy_agent[0],
            "cooperation": True,
            "message": {"type": "resource_offer", "content": "I can share some energy with you"}
        }

    def _decide_coordination(self, perceptions):
        """Decide how to coordinate with other agents"""
        # Simple coordination: move towards center of nearby agents
        if len(perceptions["other_agents"]) > 0:
            center_x = sum(agent["position"][0] for agent in perceptions["other_agents"].values()) / len(perceptions["other_agents"])
            center_y = sum(agent["position"][1] for agent in perceptions["other_agents"].values()) / len(perceptions["other_agents"])

            return {
                "action": "move",
                "target": (center_x, center_y),
                "cooperation": True,
                "message": {"type": "coordination", "content": "Moving to coordinate"}
            }

        return self._default_behavior(perceptions)

    def _default_behavior(self, perceptions):
        """Default behavior when no cooperation is needed"""
        # Look for resources
        if perceptions["environment"]["resources"]:
            nearest_resource = min(
                perceptions["environment"]["resources"],
                key=lambda r: self._calculate_distance(r["position"])
            )
            return {
                "action": "move",
                "target": nearest_resource["position"],
                "cooperation": False,
                "message": None
            }

        return {
            "action": "idle",
            "target": None,
            "cooperation": False,
            "message": None
        }

    def _execute_help(self, decision):
        """Execute help action"""
        target_agent = self.other_agents.get(decision["target"])
        if target_agent:
            # Transfer energy to help
            energy_to_transfer = min(20, self.energy * 0.2)
            self.energy -= energy_to_transfer
            target_agent.energy += energy_to_transfer

            # Update trust
            if decision["target"] not in self.trust_levels:
                self.trust_levels[decision["target"]] = 0.5
            self.trust_levels[decision["target"]] = min(1.0, self.trust_levels[decision["target"]] + 0.1)

            return {"success": True, "message": f"Helped {target_agent.name}"}

        return {"success": False, "message": "Target agent not found"}

    def _execute_sharing(self, decision):
        """Execute resource sharing"""
        target_agent = self.other_agents.get(decision["target"])
        if target_agent:
            # Share energy
            energy_to_share = min(15, self.energy * 0.15)
            self.energy -= energy_to_share
            target_agent.energy += energy_to_share

            return {"success": True, "message": f"Shared resources with {target_agent.name}"}

        return {"success": False, "message": "Target agent not found"}

    def _execute_coordination(self, decision):
        """Execute coordination action"""
        if decision["target"]:
            # Move towards target position
            self._move_towards(decision["target"])
            return {"success": True, "message": "Coordinated movement"}

        return {"success": False, "message": "No target for coordination"}

    def _execute_movement(self, decision):
        """Execute movement action"""
        if decision["target"]:
            self._move_towards(decision["target"])
            return {"success": True, "message": "Moved towards target"}

        return {"success": False, "message": "No movement target"}

    def _execute_communication(self, decision):
        """Execute communication action"""
        if decision["message"]:
            self.communicate(decision["message"], decision.get("target"))
            return {"success": True, "message": "Communication sent"}

        return {"success": False, "message": "No message to send"}

    def _move_towards(self, target_position):
        """Move towards target position"""
        dx = target_position[0] - self.position[0]
        dy = target_position[1] - self.position[1]
        distance = (dx**2 + dy**2)**0.5

        if distance > 0:
            # Normalize direction
            dx /= distance
            dy /= distance

            # Move with speed limit
            move_distance = min(self.speed, distance)
            self.position = (
                self.position[0] + dx * move_distance,
                self.position[1] + dy * move_distance
            )

            # Consume energy
            self.energy = max(0, self.energy - move_distance * 0.1)

Step 3: Competitive Multi-Agent Systems

Competitive Agent Implementation

class CompetitiveAgent(Agent):
    def __init__(self, agent_id=None, name="CompetitiveAgent"):
        super().__init__(agent_id, name)
        self.aggression_level = 0.7
        self.competitiveness = 0.8
        self.strategy = "aggressive"  # aggressive, defensive, opportunistic
        self.rivals = set()
        self.allies = set()

        # Competitive parameters
        self.attack_threshold = 0.6
        self.defense_threshold = 0.4
        self.opportunity_threshold = 0.5

    def perceive(self, environment):
        """Perceive environment with competitive focus"""
        perceptions = super().perceive(environment)

        # Add competitive information
        perceptions["threats"] = self._identify_threats(perceptions)
        perceptions["opportunities"] = self._identify_opportunities(perceptions)
        perceptions["rivals"] = self._identify_rivals(perceptions)

        return perceptions

    def decide(self, perceptions):
        """Make competitive decisions"""
        decision = {
            "action": "idle",
            "target": None,
            "aggressive": False,
            "message": None
        }

        # Check for attack opportunities
        if self._should_attack(perceptions):
            decision = self._decide_attack(perceptions)

        # Check for defense needs
        elif self._should_defend(perceptions):
            decision = self._decide_defense(perceptions)

        # Check for opportunistic actions
        elif self._should_exploit_opportunity(perceptions):
            decision = self._decide_opportunity(perceptions)

        # Default competitive behavior
        else:
            decision = self._default_competitive_behavior(perceptions)

        return decision

    def _identify_threats(self, perceptions):
        """Identify potential threats"""
        threats = []
        for agent_id, agent_info in perceptions["other_agents"].items():
            # Consider agents with higher energy as threats
            if agent_info["energy"] > self.energy * 1.2:
                threats.append({
                    "agent_id": agent_id,
                    "threat_level": agent_info["energy"] / self.energy,
                    "distance": agent_info["distance"]
                })
        return threats

    def _identify_opportunities(self, perceptions):
        """Identify opportunities for competitive advantage"""
        opportunities = []

        # Look for weak agents
        for agent_id, agent_info in perceptions["other_agents"].items():
            if agent_info["energy"] < self.energy * 0.8:
                opportunities.append({
                    "type": "weak_agent",
                    "agent_id": agent_id,
                    "advantage": self.energy / agent_info["energy"]
                })

        # Look for resources
        for resource in perceptions["environment"]["resources"]:
            opportunities.append({
                "type": "resource",
                "position": resource["position"],
                "value": resource["value"]
            })

        return opportunities

    def _identify_rivals(self, perceptions):
        """Identify rival agents"""
        rivals = []
        for agent_id, agent_info in perceptions["other_agents"].items():
            if agent_id in self.rivals:
                rivals.append({
                    "agent_id": agent_id,
                    "rivalry_level": self._calculate_rivalry_level(agent_id),
                    "distance": agent_info["distance"]
                })
        return rivals

    def _should_attack(self, perceptions):
        """Check if should attack another agent"""
        if not perceptions["threats"] and not perceptions["opportunities"]:
            return False

        # Attack if we have advantage
        for opportunity in perceptions["opportunities"]:
            if opportunity["type"] == "weak_agent" and opportunity["advantage"] > 1.5:
                return True

        return False

    def _should_defend(self, perceptions):
        """Check if should defend against threats"""
        if not perceptions["threats"]:
            return False

        # Defend if threatened
        for threat in perceptions["threats"]:
            if threat["threat_level"] > 1.5 and threat["distance"] < 5:
                return True

        return False

    def _should_exploit_opportunity(self, perceptions):
        """Check if should exploit an opportunity"""
        if not perceptions["opportunities"]:
            return False

        # Exploit if opportunity is good
        for opportunity in perceptions["opportunities"]:
            if opportunity["type"] == "resource" and opportunity["value"] > 50:
                return True

        return False

    def _decide_attack(self, perceptions):
        """Decide how to attack"""
        # Find best target
        best_target = None
        best_advantage = 0

        for opportunity in perceptions["opportunities"]:
            if opportunity["type"] == "weak_agent":
                if opportunity["advantage"] > best_advantage:
                    best_advantage = opportunity["advantage"]
                    best_target = opportunity["agent_id"]

        if best_target:
            return {
                "action": "attack",
                "target": best_target,
                "aggressive": True,
                "message": {"type": "threat", "content": "You're going down!"}
            }

        return self._default_competitive_behavior(perceptions)

    def _decide_defense(self, perceptions):
        """Decide how to defend"""
        # Find nearest threat
        nearest_threat = min(perceptions["threats"], key=lambda t: t["distance"])

        # Move away from threat
        threat_agent = self.other_agents.get(nearest_threat["agent_id"])
        if threat_agent:
            # Calculate escape direction
            dx = self.position[0] - threat_agent.position[0]
            dy = self.position[1] - threat_agent.position[1]
            distance = (dx**2 + dy**2)**0.5

            if distance > 0:
                escape_x = self.position[0] + (dx / distance) * 10
                escape_y = self.position[1] + (dy / distance) * 10

                return {
                    "action": "move",
                    "target": (escape_x, escape_y),
                    "aggressive": False,
                    "message": {"type": "defense", "content": "Retreating!"}
                }

        return self._default_competitive_behavior(perceptions)

    def _decide_opportunity(self, perceptions):
        """Decide how to exploit opportunity"""
        # Find best opportunity
        best_opportunity = max(perceptions["opportunities"], key=lambda o: o.get("value", 0))

        if best_opportunity["type"] == "resource":
            return {
                "action": "move",
                "target": best_opportunity["position"],
                "aggressive": False,
                "message": {"type": "opportunity", "content": "Going for the resource!"}
            }

        return self._default_competitive_behavior(perceptions)

    def _default_competitive_behavior(self, perceptions):
        """Default competitive behavior"""
        # Look for resources or weak agents
        if perceptions["opportunities"]:
            best_opportunity = max(perceptions["opportunities"], key=lambda o: o.get("value", 0))
            if best_opportunity["type"] == "resource":
                return {
                    "action": "move",
                    "target": best_opportunity["position"],
                    "aggressive": False,
                    "message": None
                }

        return {
            "action": "idle",
            "target": None,
            "aggressive": False,
            "message": None
        }

    def _calculate_rivalry_level(self, agent_id):
        """Calculate rivalry level with another agent"""
        # Simple rivalry calculation
        if agent_id in self.rivals:
            return 0.8
        return 0.2

Step 4: Swarm Intelligence Systems

Swarm Agent Implementation

class SwarmAgent(Agent):
    def __init__(self, agent_id=None, name="SwarmAgent"):
        super().__init__(agent_id, name)
        self.swarm_id = None
        self.swarm_center = (0, 0)
        self.swarm_velocity = (0, 0)
        self.swarm_size = 0

        # Swarm behavior parameters
        self.separation_weight = 1.0
        self.alignment_weight = 1.0
        self.cohesion_weight = 1.0
        self.avoidance_weight = 2.0

        # Swarm communication
        self.swarm_messages = []
        self.consensus_threshold = 0.7

    def perceive(self, environment):
        """Perceive environment with swarm focus"""
        perceptions = super().perceive(environment)

        # Add swarm-specific information
        perceptions["swarm_members"] = self._identify_swarm_members(perceptions)
        perceptions["swarm_center"] = self._calculate_swarm_center(perceptions)
        perceptions["swarm_velocity"] = self._calculate_swarm_velocity(perceptions)
        perceptions["swarm_messages"] = self.swarm_messages.copy()
        self.swarm_messages.clear()

        return perceptions

    def decide(self, perceptions):
        """Make swarm-based decisions"""
        decision = {
            "action": "swarm_move",
            "target": None,
            "swarm_behavior": True,
            "message": None
        }

        # Apply swarm rules
        if perceptions["swarm_members"]:
            # Calculate swarm forces
            separation_force = self._calculate_separation_force(perceptions)
            alignment_force = self._calculate_alignment_force(perceptions)
            cohesion_force = self._calculate_cohesion_force(perceptions)
            avoidance_force = self._calculate_avoidance_force(perceptions)

            # Combine forces
            total_force = self._combine_forces(
                separation_force, alignment_force, 
                cohesion_force, avoidance_force
            )

            # Calculate new position
            new_position = (
                self.position[0] + total_force[0],
                self.position[1] + total_force[1]
            )

            decision["target"] = new_position
            decision["swarm_behavior"] = True

        # Check for swarm consensus
        if self._should_reach_consensus(perceptions):
            decision["message"] = self._create_consensus_message(perceptions)

        return decision

    def _identify_swarm_members(self, perceptions):
        """Identify other agents in the swarm"""
        swarm_members = []
        for agent_id, agent_info in perceptions["other_agents"].items():
            if agent_info["distance"] <= self.communication_range:
                swarm_members.append({
                    "agent_id": agent_id,
                    "position": agent_info["position"],
                    "velocity": agent_info.get("velocity", (0, 0)),
                    "distance": agent_info["distance"]
                })
        return swarm_members

    def _calculate_swarm_center(self, perceptions):
        """Calculate center of swarm"""
        if not perceptions["swarm_members"]:
            return self.position

        center_x = sum(member["position"][0] for member in perceptions["swarm_members"]) / len(perceptions["swarm_members"])
        center_y = sum(member["position"][1] for member in perceptions["swarm_members"]) / len(perceptions["swarm_members"])

        return (center_x, center_y)

    def _calculate_swarm_velocity(self, perceptions):
        """Calculate average velocity of swarm"""
        if not perceptions["swarm_members"]:
            return (0, 0)

        avg_vx = sum(member["velocity"][0] for member in perceptions["swarm_members"]) / len(perceptions["swarm_members"])
        avg_vy = sum(member["velocity"][1] for member in perceptions["swarm_members"]) / len(perceptions["swarm_members"])

        return (avg_vx, avg_vy)

    def _calculate_separation_force(self, perceptions):
        """Calculate separation force to avoid crowding"""
        separation_force = (0, 0)

        for member in perceptions["swarm_members"]:
            distance = member["distance"]
            if distance > 0 and distance < 2:  # Too close
                # Calculate repulsion force
                dx = self.position[0] - member["position"][0]
                dy = self.position[1] - member["position"][1]

                # Normalize and scale
                force_magnitude = self.separation_weight / (distance * distance)
                separation_force = (
                    separation_force[0] + (dx / distance) * force_magnitude,
                    separation_force[1] + (dy / distance) * force_magnitude
                )

        return separation_force

    def _calculate_alignment_force(self, perceptions):
        """Calculate alignment force to match swarm velocity"""
        if not perceptions["swarm_members"]:
            return (0, 0)

        swarm_velocity = perceptions["swarm_velocity"]
        current_velocity = (0, 0)  # Simplified - would track actual velocity

        alignment_force = (
            (swarm_velocity[0] - current_velocity[0]) * self.alignment_weight,
            (swarm_velocity[1] - current_velocity[1]) * self.alignment_weight
        )

        return alignment_force

    def _calculate_cohesion_force(self, perceptions):
        """Calculate cohesion force to move towards swarm center"""
        swarm_center = perceptions["swarm_center"]

        dx = swarm_center[0] - self.position[0]
        dy = swarm_center[1] - self.position[1]
        distance = (dx**2 + dy**2)**0.5

        if distance > 0:
            cohesion_force = (
                (dx / distance) * self.cohesion_weight,
                (dy / distance) * self.cohesion_weight
            )
        else:
            cohesion_force = (0, 0)

        return cohesion_force

    def _calculate_avoidance_force(self, perceptions):
        """Calculate avoidance force to avoid obstacles"""
        avoidance_force = (0, 0)

        # Avoid obstacles
        for obstacle in perceptions["environment"]["obstacles"]:
            distance = self._calculate_distance(obstacle["position"])
            if distance < 5:  # Too close to obstacle
                dx = self.position[0] - obstacle["position"][0]
                dy = self.position[1] - obstacle["position"][1]

                if distance > 0:
                    force_magnitude = self.avoidance_weight / distance
                    avoidance_force = (
                        avoidance_force[0] + (dx / distance) * force_magnitude,
                        avoidance_force[1] + (dy / distance) * force_magnitude
                    )

        return avoidance_force

    def _combine_forces(self, separation, alignment, cohesion, avoidance):
        """Combine all forces into final movement"""
        total_force = (
            separation[0] + alignment[0] + cohesion[0] + avoidance[0],
            separation[1] + alignment[1] + cohesion[1] + avoidance[1]
        )

        # Limit force magnitude
        magnitude = (total_force[0]**2 + total_force[1]**2)**0.5
        if magnitude > self.speed:
            total_force = (
                (total_force[0] / magnitude) * self.speed,
                (total_force[1] / magnitude) * self.speed
            )

        return total_force

    def _should_reach_consensus(self, perceptions):
        """Check if should reach consensus with swarm"""
        if len(perceptions["swarm_members"]) > 3:
            return True
        return False

    def _create_consensus_message(self, perceptions):
        """Create consensus message for swarm"""
        return {
            "type": "swarm_consensus",
            "content": "Let's move together",
            "swarm_center": perceptions["swarm_center"],
            "swarm_velocity": perceptions["swarm_velocity"]
        }

Step 5: Multi-Agent System Testing

Test Suite for Multi-Agent Systems

def test_multi_agent_system():
    """Test multi-agent system functionality"""
    print("Testing Multi-Agent AI System")
    print("=" * 50)

    # Create environment
    environment = MultiAgentEnvironment(100, 100)

    # Create cooperative agents
    cooperative_agents = []
    for i in range(5):
        agent = CooperativeAgent(name=f"CooperativeAgent_{i}")
        agent.position = (i * 10, i * 10)
        environment.add_agent(agent)
        cooperative_agents.append(agent)

    # Create competitive agents
    competitive_agents = []
    for i in range(3):
        agent = CompetitiveAgent(name=f"CompetitiveAgent_{i}")
        agent.position = (50 + i * 15, 50 + i * 15)
        environment.add_agent(agent)
        competitive_agents.append(agent)

    # Create swarm agents
    swarm_agents = []
    for i in range(8):
        agent = SwarmAgent(name=f"SwarmAgent_{i}")
        agent.position = (20 + i * 5, 20 + i * 5)
        environment.add_agent(agent)
        swarm_agents.append(agent)

    # Add resources
    for i in range(10):
        environment.add_resource(
            (i * 10, i * 10), 
            "energy", 
            50
        )

    # Test system for multiple time steps
    print("Running multi-agent simulation...")
    for time_step in range(100):
        environment.update()

        # Check for emergent behaviors
        if time_step % 20 == 0:
            print(f"Time step {time_step}:")
            print(f"  Agents: {len(environment.agents)}")
            print(f"  Resources: {len(environment.resources)}")
            print(f"  Average energy: {environment.global_state['average_energy']:.2f}")

    # Test cooperation
    print("\nTesting cooperation...")
    agent1 = cooperative_agents[0]
    agent2 = cooperative_agents[1]

    # Simulate help request
    agent2.energy = 20  # Low energy
    agent1.communicate({"type": "help_request", "urgency": 0.8}, agent2.id)

    # Test cooperation response
    perceptions = agent1.perceive(environment)
    decision = agent1.decide(perceptions)
    assert decision["cooperation"] == True
    print("āœ“ Cooperation working")

    # Test competition
    print("Testing competition...")
    comp_agent1 = competitive_agents[0]
    comp_agent2 = competitive_agents[1]

    # Simulate competitive scenario
    comp_agent2.energy = 30  # Lower energy
    perceptions = comp_agent1.perceive(environment)
    decision = comp_agent1.decide(perceptions)

    # Should show competitive behavior
    assert decision["aggressive"] or decision["action"] != "idle"
    print("āœ“ Competition working")

    # Test swarm behavior
    print("Testing swarm behavior...")
    swarm_agent = swarm_agents[0]
    perceptions = swarm_agent.perceive(environment)
    decision = swarm_agent.decide(perceptions)

    # Should show swarm behavior
    assert decision["swarm_behavior"] == True
    print("āœ“ Swarm behavior working")

    # Test performance
    print("Testing performance...")
    start_time = time.time()
    for i in range(50):
        environment.update()
    end_time = time.time()

    avg_time = (end_time - start_time) / 50
    assert avg_time < 0.1  # Should update within 0.1 seconds
    print(f"āœ“ Performance test passed (avg: {avg_time:.3f}s)")

    print("\nšŸŽ‰ All multi-agent tests passed!")

if __name__ == "__main__":
    test_multi_agent_system()

Best Practices for Multi-Agent Systems

1. System Design

  • Design clear agent roles and responsibilities
  • Implement efficient communication protocols
  • Balance cooperation and competition for interesting dynamics
  • Plan for scalability from the beginning

2. Performance Optimization

  • Use spatial partitioning for large numbers of agents
  • Implement efficient collision detection and range queries
  • Cache frequently used calculations and perceptions
  • Monitor performance metrics and optimize bottlenecks

3. Emergent Behavior

  • Design simple rules that lead to complex behaviors
  • Test for emergent properties and unexpected interactions
  • Document emergent behaviors for future reference
  • Balance predictability and surprise in agent behavior

4. Testing and Validation

  • Test individual agent behaviors before system integration
  • Validate emergent behaviors through extensive simulation
  • Monitor system performance under various conditions
  • Implement debugging tools for complex interactions

Next Steps

Congratulations! You've learned how to build multi-agent AI systems. Here's what to do next:

1. Practice with Advanced Features

  • Implement more sophisticated communication protocols
  • Build complex emergent behaviors
  • Create hybrid systems with both cooperation and competition
  • Experiment with different swarm algorithms

2. Explore Procedural Content Generation

  • Learn about AI-powered content generation
  • Implement procedural quest and story systems
  • Create dynamic world generation
  • Build adaptive content systems

3. Continue Learning

4. Build Your Projects

  • Create multi-agent game systems
  • Implement swarm intelligence
  • Build cooperative and competitive AI
  • Share your work with the community

Resources and Further Reading

Documentation

Community

Tools

Conclusion

You've learned how to build sophisticated multi-agent AI systems. You now understand:

  • How to design and implement multi-agent architectures
  • How to create cooperative and competitive AI systems
  • How to implement swarm intelligence and emergent behaviors
  • How to optimize performance for multiple agents
  • How to test and validate complex multi-agent systems

Your AI systems can now exhibit complex emergent behaviors through the interactions of multiple autonomous agents. This foundation will serve you well as you continue to explore advanced AI game development techniques.

Ready for the next step? Continue with Procedural Content Generation with AI to learn how to use AI to generate game content dynamically.


This tutorial is part of the GamineAI Intermediate Tutorial Series. Learn advanced AI techniques, build sophisticated systems, and create professional-grade AI-powered games.