Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
A comprehensive collection of Agent Skills for context engineering, multi-agent architectures, and production agent systems.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
skills/multi-agent-patterns/scripts/coordination.py
1"""2Multi-Agent Coordination Utilities34Provides reusable building blocks for multi-agent coordination patterns:5supervisor/orchestrator, peer-to-peer handoffs, consensus mechanisms,6and failure handling with circuit breakers.78Use when: building multi-agent systems that need structured communication,9task delegation, consensus voting, or fault-tolerant agent coordination.1011Designed for composability — import individual classes or use the12``if __name__ == "__main__"`` demo to see all patterns in action.13"""1415from typing import Dict, List, Any, Optional16from dataclasses import dataclass, field17from enum import Enum18import time19import uuid2021__all__ = [22"MessageType",23"AgentMessage",24"AgentCommunication",25"SupervisorAgent",26"HandoffProtocol",27"ConsensusManager",28"AgentFailureHandler",29]303132class MessageType(Enum):33"""Types of messages exchanged between agents."""3435REQUEST = "request"36RESPONSE = "response"37HANDOVER = "handover"38FEEDBACK = "feedback"39ALERT = "alert"404142@dataclass43class AgentMessage:44"""Message exchanged between agents.4546Use when: agents need a structured envelope for inter-agent communication47that carries sender/receiver identity, type, priority, and payload.48"""4950sender: str51receiver: str52message_type: MessageType53content: Dict[str, Any]54timestamp: float = field(default_factory=time.time)55message_id: str = field(default_factory=lambda: str(uuid.uuid4()))56requires_response: bool = False57priority: int = 0 # 0 = normal, higher = more urgent585960class AgentCommunication:61"""Communication channel for multi-agent systems.6263Use when: multiple agents need an in-process message bus for sending,64receiving, and broadcasting messages with history tracking.65"""6667def __init__(self) -> None:68self.inbox: Dict[str, List[AgentMessage]] = {}69self.outbox: List[AgentMessage] = []70self.message_history: List[AgentMessage] = []7172def send(self, message: AgentMessage) -> None:73"""Send a message to an agent."""74if message.receiver not in self.inbox:75self.inbox[message.receiver] = []76self.inbox[message.receiver].append(message)77self.outbox.append(message)78self.message_history.append(message)7980def receive(self, agent_id: str) -> List[AgentMessage]:81"""Receive all messages for an agent, clearing its inbox."""82messages = self.inbox.get(agent_id, [])83self.inbox[agent_id] = []84return messages8586def broadcast(87self,88sender: str,89message_type: MessageType,90content: Dict[str, Any],91receivers: List[str],92) -> None:93"""Broadcast a message to multiple agents."""94for receiver in receivers:95self.send(96AgentMessage(97sender=sender,98receiver=receiver,99message_type=message_type,100content=content,101)102)103104105# ---------------------------------------------------------------------------106# Supervisor Pattern107# ---------------------------------------------------------------------------108109110class SupervisorAgent:111"""Central supervisor agent that coordinates worker agents.112113Use when: tasks have clear decomposition and a single coordinator should114delegate subtasks, track worker status, and aggregate results.115"""116117def __init__(self, name: str, communication: AgentCommunication) -> None:118self.name = name119self.communication = communication120self.workers: Dict[str, Dict[str, Any]] = {}121self.task_queue: List[Dict[str, Any]] = []122self.completed_tasks: List[Dict[str, Any]] = []123self.current_state: Dict[str, Any] = {}124125def register_worker(self, worker_id: str, capabilities: List[str]) -> None:126"""Register a worker agent with the supervisor."""127self.workers[worker_id] = {128"capabilities": capabilities,129"status": "available",130"current_task": None,131"metrics": {"tasks_completed": 0, "avg_response_time": 0.0},132}133134def decompose_task(self, task: Dict[str, Any]) -> List[Dict[str, Any]]:135"""Decompose a task into subtasks.136137Use when: a high-level task needs to be broken into assignable units.138In production, replace the rule-based logic with LLM-driven planning.139"""140subtasks: List[Dict[str, Any]] = []141task_type = task.get("type", "general")142143if task_type == "research":144subtasks = [145{"type": "search", "description": "Gather information"},146{"type": "analyze", "description": "Analyze findings"},147{"type": "synthesize", "description": "Synthesize results"},148]149elif task_type == "create":150subtasks = [151{"type": "plan", "description": "Create plan"},152{"type": "draft", "description": "Draft content"},153{"type": "review", "description": "Review and refine"},154]155else:156subtasks = [157{158"type": "execute",159"description": task.get("description", "Execute task"),160}161]162163for subtask in subtasks:164subtask["parent_task"] = task.get("id")165subtask["priority"] = task.get("priority", 0)166167return subtasks168169def assign_task(self, subtask: Dict[str, Any], worker_id: str) -> None:170"""Assign a subtask to a worker agent."""171if worker_id not in self.workers:172raise ValueError(f"Unknown worker: {worker_id}")173174self.workers[worker_id]["status"] = "busy"175self.workers[worker_id]["current_task"] = subtask.get("id")176177self._send(178AgentMessage(179sender=self.name,180receiver=worker_id,181message_type=MessageType.REQUEST,182content={"action": "execute_task", "task": subtask},183requires_response=True,184priority=subtask.get("priority", 0),185)186)187188def select_worker(self, subtask: Dict[str, Any]) -> str:189"""Select the best available worker for a subtask.190191Use when: the supervisor needs capability-aware routing with192load-balancing (fewest completed tasks chosen first).193"""194required_capability = subtask.get("type", "general")195196candidates = [197wid198for wid, info in self.workers.items()199if info["status"] == "available"200and required_capability in info["capabilities"]201]202203if not candidates:204candidates = [205wid206for wid, info in self.workers.items()207if info["status"] == "available"208]209210if not candidates:211raise ValueError("No available workers")212213return min(214candidates,215key=lambda w: self.workers[w]["metrics"]["tasks_completed"],216)217218def aggregate_results(219self, subtask_results: List[Dict[str, Any]]220) -> Dict[str, Any]:221"""Aggregate results from completed subtasks."""222summaries = [223r.get("summary", "")224for r in subtask_results225if r.get("success")226]227successful = sum(2281 for r in subtask_results if r.get("success", False)229)230quality = successful / len(subtask_results) if subtask_results else 0.0231232return {233"results": subtask_results,234"summary": " | ".join(summaries),235"quality_score": quality,236}237238def run_workflow(self, task: Dict[str, Any]) -> Dict[str, Any]:239"""Execute a complete workflow with supervision.240241Use when: running an end-to-end supervised pipeline that decomposes242a task, assigns subtasks, collects results, and aggregates them.243244Note: This is a synchronous simulation. Workers do not execute245asynchronously — each subtask is simulated inline. In production,246replace ``_simulate_worker_response`` with actual async worker247execution and message passing.248"""249subtasks = self.decompose_task(task)250251results: List[Dict[str, Any]] = []252for subtask in subtasks:253worker = self.select_worker(subtask)254self.assign_task(subtask, worker)255256# Simulate worker executing and responding257response = self._simulate_worker_response(worker, subtask)258self.communication.send(259AgentMessage(260sender=worker,261receiver=self.name,262message_type=MessageType.RESPONSE,263content=response,264)265)266self.workers[worker]["status"] = "available"267self.workers[worker]["metrics"]["tasks_completed"] += 1268269messages = self.communication.receive(self.name)270for msg in messages:271if msg.message_type == MessageType.RESPONSE:272results.append(msg.content)273274final_result = self.aggregate_results(results)275276return {277"task": task,278"subtask_results": results,279"final_result": final_result,280"success": final_result["quality_score"] >= 0.8,281}282283def _simulate_worker_response(284self, worker_id: str, subtask: Dict[str, Any]285) -> Dict[str, Any]:286"""Simulate a worker completing a subtask.287288In production, replace with actual agent execution that sends289the subtask to a worker process and awaits a real response.290"""291return {292"success": True,293"summary": f"{worker_id} completed: {subtask.get('description', subtask.get('type', 'task'))}",294"worker": worker_id,295"subtask_type": subtask.get("type"),296}297298def _send(self, message: AgentMessage) -> None:299"""Send message through the communication channel."""300self.communication.send(message)301302303# ---------------------------------------------------------------------------304# Handoff Protocol305# ---------------------------------------------------------------------------306307308class HandoffProtocol:309"""Protocol for agent-to-agent handoffs.310311Use when: implementing peer-to-peer or swarm patterns where agents312transfer control and task state to one another.313"""314315def __init__(self, communication: AgentCommunication) -> None:316self.communication = communication317318def create_handoff(319self,320from_agent: str,321to_agent: str,322context: Dict[str, Any],323reason: str,324) -> AgentMessage:325"""Create a handoff message with transferred context."""326return AgentMessage(327sender=from_agent,328receiver=to_agent,329message_type=MessageType.HANDOVER,330content={331"handoff_reason": reason,332"transferred_context": context,333"handoff_timestamp": time.time(),334},335priority=1,336)337338def accept_handoff(self, agent_id: str) -> Optional[AgentMessage]:339"""Accept the first pending handoff for an agent, if any."""340messages = self.communication.receive(agent_id)341342for msg in messages:343if msg.message_type == MessageType.HANDOVER:344return msg345346return None347348def transfer_with_state(349self,350from_agent: str,351to_agent: str,352state: Dict[str, Any],353task: Dict[str, Any],354) -> bool:355"""Transfer task state from one agent to another.356357Use when: a handoff must carry full task state and progress so the358receiving agent can resume without re-deriving context.359360Returns True if the receiving agent acknowledged the handoff.361"""362handoff = self.create_handoff(363from_agent=from_agent,364to_agent=to_agent,365context={366"task_state": state,367"task_details": task,368"progress": state.get("progress", 0),369},370reason="task_transfer",371)372373self.communication.send(handoff)374375# In production, replace sleep with async await + timeout376time.sleep(0.1)377ack = self.communication.receive(from_agent)378379return any(380m.message_type == MessageType.RESPONSE381and m.content.get("status") == "handoff_received"382for m in ack383)384385386# ---------------------------------------------------------------------------387# Consensus Mechanism388# ---------------------------------------------------------------------------389390391class ConsensusManager:392"""Manager for multi-agent consensus building.393394Use when: multiple agents must vote on a decision and the system needs395weighted consensus that accounts for confidence and expertise rather396than naive majority voting.397"""398399def __init__(self) -> None:400self.votes: Dict[str, List[Dict[str, Any]]] = {}401self.debates: Dict[str, List[Dict[str, Any]]] = {}402403def initiate_vote(404self, topic_id: str, agents: List[str], options: List[str]405) -> None:406"""Initiate a voting round on a topic."""407self.votes[topic_id] = [408{409"agent": agent,410"topic": topic_id,411"options": options,412"status": "pending",413}414for agent in agents415]416417def submit_vote(418self,419topic_id: str,420agent_id: str,421selection: str,422confidence: float,423) -> None:424"""Submit a vote for a topic with a confidence weight."""425if topic_id not in self.votes:426raise ValueError(f"Unknown topic: {topic_id}")427428for vote in self.votes[topic_id]:429if vote["agent"] == agent_id:430vote["status"] = "cast"431vote["selection"] = selection432vote["confidence"] = confidence433break434435def calculate_weighted_consensus(self, topic_id: str) -> Dict[str, Any]:436"""Calculate weighted consensus from cast votes.437438Use when: votes are in and the system needs to determine a winner439weighted by each agent's confidence rather than simple majority.440Weight = confidence * expertise_factor.441"""442if topic_id not in self.votes:443raise ValueError(f"Unknown topic: {topic_id}")444445votes = [446v for v in self.votes[topic_id] if v.get("status") == "cast"447]448449if not votes:450return {"status": "no_votes", "result": None}451452# Group by selection453selections: Dict[str, List[Dict[str, Any]]] = {}454for vote in votes:455selection = vote["selection"]456if selection not in selections:457selections[selection] = []458selections[selection].append(vote)459460# Calculate weighted score for each selection461results: Dict[str, Dict[str, Any]] = {}462for selection, selection_votes in selections.items():463weighted_sum = sum(v["confidence"] for v in selection_votes)464avg_confidence = (465weighted_sum / len(selection_votes) if selection_votes else 0.0466)467results[selection] = {468"weighted_score": weighted_sum,469"avg_confidence": avg_confidence,470"vote_count": len(selection_votes),471}472473winner = max(results.keys(), key=lambda s: results[s]["weighted_score"])474475return {476"status": "complete",477"result": winner,478"details": results,479"consensus_strength": (480results[winner]["weighted_score"] / len(votes) if votes else 0.0481),482}483484485# ---------------------------------------------------------------------------486# Failure Handling487# ---------------------------------------------------------------------------488489490class AgentFailureHandler:491"""Handler for agent failures in multi-agent systems.492493Use when: agents may fail and the system needs retry logic with494exponential backoff, circuit breakers, and automatic rerouting to495backup agents.496"""497498def __init__(499self,500communication: AgentCommunication,501max_retries: int = 3,502) -> None:503self.communication = communication504self.max_retries = max_retries505self.failure_counts: Dict[str, int] = {}506self.circuit_breakers: Dict[str, float] = {} # agent -> unlock time507508def handle_failure(509self, agent_id: str, task_id: str, error: str510) -> Dict[str, Any]:511"""Handle a failure from an agent.512513Use when: an agent reports an error and the system must decide514whether to retry (with backoff) or reroute to a backup agent.515"""516self.failure_counts[agent_id] = (517self.failure_counts.get(agent_id, 0) + 1518)519520if self.failure_counts[agent_id] >= self.max_retries:521self._activate_circuit_breaker(agent_id)522return {523"action": "reroute",524"reason": "circuit_breaker_activated",525"alternative": self._find_alternative_agent(agent_id),526}527528return {529"action": "retry",530"reason": error,531"retry_count": self.failure_counts[agent_id],532"delay": min(2 ** self.failure_counts[agent_id], 60),533}534535def _activate_circuit_breaker(self, agent_id: str) -> None:536"""Temporarily disable an agent (1-minute cooldown)."""537self.circuit_breakers[agent_id] = time.time() + 60538539def _find_alternative_agent(self, failed_agent: str) -> str:540"""Find an alternative agent to handle the task.541542In production, check agent capabilities and availability.543"""544return "default_backup_agent"545546def is_available(self, agent_id: str) -> bool:547"""Check if an agent is available (circuit breaker not active)."""548if agent_id in self.circuit_breakers:549if time.time() < self.circuit_breakers[agent_id]:550return False551del self.circuit_breakers[agent_id]552self.failure_counts[agent_id] = 0553return True554555def record_success(self, agent_id: str) -> None:556"""Record a successful task completion, resetting failure count."""557self.failure_counts[agent_id] = 0558559560# ---------------------------------------------------------------------------561# Demo / CLI entry point562# ---------------------------------------------------------------------------563564565if __name__ == "__main__":566print("=== Multi-Agent Coordination Demo ===\n")567568# 1. Communication channel569comm = AgentCommunication()570print("1. Created communication channel")571572# 2. Supervisor pattern573supervisor = SupervisorAgent("supervisor", comm)574supervisor.register_worker("researcher", ["search", "analyze"])575supervisor.register_worker("writer", ["synthesize", "draft"])576print("2. Registered supervisor with 2 workers: researcher, writer")577578# 3. Handoff protocol579protocol = HandoffProtocol(comm)580handoff_msg = protocol.create_handoff(581from_agent="researcher",582to_agent="writer",583context={"findings": ["item1", "item2"]},584reason="research_complete",585)586comm.send(handoff_msg)587received = protocol.accept_handoff("writer")588print(589f"3. Handoff from researcher -> writer: "590f"{'accepted' if received else 'none pending'}"591)592593# 4. Consensus mechanism594consensus = ConsensusManager()595consensus.initiate_vote("best_approach", ["agent_a", "agent_b", "agent_c"], ["A", "B"])596consensus.submit_vote("best_approach", "agent_a", "A", confidence=0.9)597consensus.submit_vote("best_approach", "agent_b", "B", confidence=0.6)598consensus.submit_vote("best_approach", "agent_c", "A", confidence=0.8)599result = consensus.calculate_weighted_consensus("best_approach")600print(601f"4. Consensus result: {result['result']} "602f"(strength: {result['consensus_strength']:.2f})"603)604605# 5. Failure handling606handler = AgentFailureHandler(comm, max_retries=3)607action1 = handler.handle_failure("flaky_agent", "task_1", "timeout")608action2 = handler.handle_failure("flaky_agent", "task_1", "timeout")609action3 = handler.handle_failure("flaky_agent", "task_1", "timeout")610print(f"5. After 3 failures: action={action3['action']}")611print(f" Agent available? {handler.is_available('flaky_agent')}")612613print("\n=== Demo Complete ===")614