Sub-Agents
Decompose work across multiple specialized agents with a visible delegation log.
"""MS Agent Framework agent backing the Sub-Agents demo.Mirrors langgraph-python/src/agents/subagents.py andgoogle-adk/src/agents/subagents_agent.py:A top-level supervisor LLM orchestrates three specialized sub-agentsexposed as tools: - `research_agent` — gathers facts - `writing_agent` — drafts prose - `critique_agent` — reviews draftsEach sub-agent is a real `agent_framework.Agent` with its own systemprompt. Each delegation appends an entry to the `delegations` slot inAG-UI shared state via `state_update(...)`, so the UI can render alive delegation log via `useAgent`.Subagent invocation contract: each delegation tool returns`state_update(...)` containing the FULL updated `delegations` list. Weread the prior list out of a per-request `ContextVar` populated by an`agent_middleware` that captures the AG-UI session metadata(specifically `current_state`, which the AG-UI runtime stuffs into`session.metadata` on every turn) before the supervisor runs."""from __future__ import annotationsimport asyncioimport contextvarsimport jsonimport loggingimport threadingimport uuidfrom collections.abc import AsyncGenerator, Awaitable, Callablefrom textwrap import dedentfrom typing import Annotated, Anyfrom ag_ui.core import BaseEventfrom agent_framework import ( Agent, AgentContext, BaseChatClient, Content, agent_middleware, tool,)from agent_framework_ag_ui import AgentFrameworkAgent, state_updatefrom pydantic import Fieldlogger = logging.getLogger(__name__)# ---------------------------------------------------------------------------# State schema — `delegations` is rendered as a live log in the UI.# ---------------------------------------------------------------------------STATE_SCHEMA: dict[str, object] = { "delegations": { "type": "array", "description": ( "Append-only log of supervisor -> sub-agent delegations. " "Each entry is a Delegation = " "{id, sub_agent, task, status, result}." ), "items": { "type": "object", "properties": { "id": {"type": "string"}, "sub_agent": {"type": "string"}, "task": {"type": "string"}, "status": {"type": "string"}, "result": {"type": "string"}, }, }, }}# ---------------------------------------------------------------------------# Per-request current_state bridge## Tools cannot directly receive `current_state` from the AG-UI runtime,# but `agent_middleware` runs once per agent invocation with full# session context. We snapshot the latest `delegations` list into a# ContextVar before `call_next()`, so each delegation tool (running in# the same task / contextvar scope) can read it back, append, and# return the FULL list via `state_update`.# ---------------------------------------------------------------------------_current_delegations: contextvars.ContextVar[list[dict[str, Any]]] = ( contextvars.ContextVar("ms_subagents_current_delegations", default=[]))def _extract_delegations(raw: Any) -> list[dict[str, Any]]: """Pull a clean delegations list out of session metadata. `session.metadata["current_state"]` is JSON-serialized by the AG-UI runtime (see `_build_safe_metadata`) so we tolerate either a plain dict or its string form. """ payload: Any = raw if isinstance(payload, str): try: payload = json.loads(payload) except json.JSONDecodeError: logger.warning( "subagents: current_state was not valid JSON; " "starting from empty delegations list" ) return [] if not isinstance(payload, dict): return [] delegations = payload.get("delegations") if not isinstance(delegations, list): return [] return [dict(d) for d in delegations if isinstance(d, dict)]@agent_middlewareasync def capture_current_state( context: AgentContext, call_next: Callable[[], Awaitable[None]]) -> None: """Snapshot `delegations` from session metadata into a ContextVar.""" snapshot: list[dict[str, Any]] = [] session = context.session metadata = getattr(session, "metadata", None) if session else None if isinstance(metadata, dict): snapshot = _extract_delegations(metadata.get("current_state")) token = _current_delegations.set(snapshot) try: await call_next() finally: _current_delegations.reset(token)# ---------------------------------------------------------------------------# Sub-agent factory## Each sub-agent is a full `Agent(...)` with its own system prompt.# They share the chat client with the supervisor but otherwise have no# shared memory or tools — the supervisor only sees their final text.# ---------------------------------------------------------------------------# Each sub-agent is a full-fledged `Agent(...)` with its own system# prompt. They don't share memory or tools with the supervisor — the# supervisor only sees their return value (final text content)._RESEARCH_INSTRUCTIONS = ( "You are a research sub-agent. Given a topic, produce a concise " "bulleted list of 3-5 key facts. No preamble, no closing.")_WRITING_INSTRUCTIONS = ( "You are a writing sub-agent. Given a brief and optional source " "facts, produce a polished 1-paragraph draft. Be clear and " "concrete. No preamble.")_CRITIQUE_INSTRUCTIONS = ( "You are an editorial critique sub-agent. Given a draft, give " "2-3 crisp, actionable critiques. No preamble.")def _make_sub_agent(chat_client: BaseChatClient, name: str, instructions: str) -> Agent: return Agent( client=chat_client, name=name, instructions=instructions, tools=[], )# Module-level holder so the delegation tools can reach the# pre-built sub-agents without rebuilding them on every tool call.# Populated lazily by `create_subagents_agent(...)`._SUB_AGENTS: dict[str, Agent] = {}async def _invoke_sub_agent_async(sub_agent_name: str, task: str) -> str: """Run a sub-agent on `task` and return its final text content.""" agent = _SUB_AGENTS.get(sub_agent_name) if agent is None: raise RuntimeError( f"sub-agent '{sub_agent_name}' is not registered; call " "create_subagents_agent(...) first" ) response = await agent.run(task) text = (getattr(response, "text", "") or "").strip() if text: return text # Fall back to scanning messages — `Agent.run` always returns # an `AgentRunResponse`, but `.text` may be empty if the chat # client only emitted reasoning content or tool calls. messages = getattr(response, "messages", None) or [] for message in reversed(messages): for content in getattr(message, "contents", None) or []: content_text = getattr(content, "text", None) if content_text: fallback = str(content_text).strip() if fallback: return fallback raise RuntimeError(f"sub-agent '{sub_agent_name}' returned no text content")def _invoke_sub_agent(sub_agent_name: str, task: str) -> str: """Sync bridge: drive the async invocation from inside a tool callback. `@tool` reflects on the underlying callable's signature, so the tool entry points are sync. The supervisor's chat client typically runs inside an existing event loop (FastAPI request handler), so `asyncio.run` would refuse — fall through to a worker thread that spins up its own loop. """ try: return asyncio.run(_invoke_sub_agent_async(sub_agent_name, task)) except RuntimeError as exc: if "asyncio.run() cannot be called" not in str(exc): raise container: dict[str, Any] = {} def _runner() -> None: try: container["result"] = asyncio.run( _invoke_sub_agent_async(sub_agent_name, task) ) except Exception as inner: # pragma: no cover -- defensive container["error"] = inner worker = threading.Thread(target=_runner, daemon=True) worker.start() worker.join() if "error" in container: raise container["error"] return str(container["result"])def _delegate(sub_agent_name: str, task: str) -> Content: """Common delegation flow: invoke sub-agent, append entry, push state.""" delegations = list(_current_delegations.get()) entry_id = str(uuid.uuid4()) try: result_text = _invoke_sub_agent(sub_agent_name, task) except Exception as exc: logger.exception("subagents: %s delegation failed", sub_agent_name) delegations.append( { "id": entry_id, "sub_agent": sub_agent_name, "task": task, "status": "failed", # Surface only the exception class — sub-agent error # messages can leak chat client URLs / quota details # in deployed environments. "result": (f"sub-agent error: {exc.__class__.__name__}"), } ) # Mirror the contextvar so a follow-up sub-agent call within the # same supervisor turn sees this entry. _current_delegations.set(delegations) return state_update( text=(f"{sub_agent_name} failed; surfaced in delegation log."), state={"delegations": delegations}, ) delegations.append( { "id": entry_id, "sub_agent": sub_agent_name, "task": task, "status": "completed", "result": result_text, } ) _current_delegations.set(delegations) return state_update( text=result_text, state={"delegations": delegations}, )# ---------------------------------------------------------------------------# Supervisor delegation tools — each one wraps a sub-agent invocation.# ---------------------------------------------------------------------------# Each @tool wraps a sub-agent invocation. The supervisor LLM "calls"# these tools to delegate work; each call synchronously runs the# matching sub-agent (via `_delegate`), appends the entry to the# `delegations` shared-state slot, and returns a `state_update(...)` so# the AG-UI emitter pushes a deterministic StateSnapshotEvent — both# surfacing the result to the supervisor and refreshing the live# delegation log in the UI.@tool( name="research_agent", description=( "Delegate a research task to the research sub-agent. Use for " "gathering facts, background, definitions, statistics. Returns " "a bulleted list of key facts." ),)def research_agent( task: Annotated[ str, Field(description="The research question or topic to investigate."), ],) -> Content: """Delegate a research task to the research sub-agent.""" return _delegate("research_agent", task)@tool( name="writing_agent", description=( "Delegate a drafting task to the writing sub-agent. Use for " "producing a polished paragraph, draft, or summary. Pass any " "relevant facts from prior research inside `task`." ),)def writing_agent( task: Annotated[ str, Field( description=( "The drafting brief, including any relevant source " "facts the writer should weave in." ) ), ],) -> Content: """Delegate a drafting task to the writing sub-agent.""" return _delegate("writing_agent", task)@tool( name="critique_agent", description=( "Delegate a critique task to the critique sub-agent. Use for " "reviewing a draft and suggesting concrete improvements." ),)def critique_agent( task: Annotated[ str, Field( description=( "The draft text to critique. Provide the full text -- " "the critique sub-agent has no other context." ) ), ],) -> Content: """Delegate a critique task to the critique sub-agent.""" return _delegate("critique_agent", task)# ---------------------------------------------------------------------------# Supervisor agent factory# ---------------------------------------------------------------------------SUPERVISOR_PROMPT = dedent( """ You are a supervisor agent that coordinates three specialized sub-agents to produce high-quality deliverables. Available sub-agents (call them as tools): - research_agent: gathers facts on a topic. - writing_agent: turns facts + a brief into a polished draft. - critique_agent: reviews a draft and suggests improvements. For every non-trivial user request, delegate in sequence: research_agent -> writing_agent -> critique_agent. IMPORTANT: call EACH sub-agent EXACTLY ONCE per user request. After critique_agent returns, do NOT call any sub-agent again -- return a concise final answer to the user that incorporates the critique. Pass the relevant facts/draft through the `task` argument of each tool. Keep your own messages short — explain the plan once, delegate, then return a concise summary once done. The UI shows the user a live log of every sub-agent delegation. """).strip()def _tool_call_ids(message: dict[str, Any]) -> set[str]: tool_calls = message.get("tool_calls") or message.get("toolCalls") or [] if not isinstance(tool_calls, list): return set() ids: set[str] = set() for call in tool_calls: if isinstance(call, dict) and isinstance(call.get("id"), str): ids.add(call["id"]) return idsdef _tool_result_ids(messages: list[dict[str, Any]], start_index: int) -> set[str]: ids: set[str] = set() for message in messages[start_index + 1 :]: if message.get("role") == "user": break if message.get("role") != "tool": continue call_id = message.get("tool_call_id") or message.get("toolCallId") if isinstance(call_id, str): ids.add(call_id) return idsdef _drop_orphan_assistant_tool_calls(messages: Any) -> list[dict[str, Any]]: """Remove historical assistant tool calls that lack tool result messages. The MS Agent Framework AG-UI bridge can preserve the assistant tool-call snapshot while omitting the corresponding tool-role results. OpenAI rejects that history on the next turn, so keep the final assistant text/state but omit malformed historical tool-call entries before the supervisor runs. """ if not isinstance(messages, list): return [] clean: list[dict[str, Any]] = [] for index, message in enumerate(messages): if not isinstance(message, dict): continue if message.get("role") == "assistant": call_ids = _tool_call_ids(message) if call_ids and not call_ids.issubset(_tool_result_ids(messages, index)): continue clean.append(message) return cleanclass SubagentsFrameworkAgent(AgentFrameworkAgent): """AgentFrameworkAgent that removes invalid historical tool-call snapshots.""" async def run( # type: ignore[override] self, input_data: dict[str, Any], ) -> AsyncGenerator[BaseEvent, None]: patched_input = dict(input_data) patched_input["messages"] = _drop_orphan_assistant_tool_calls( input_data.get("messages") ) async for event in super().run(patched_input): yield eventdef create_subagents_agent(chat_client: BaseChatClient) -> SubagentsFrameworkAgent: """Instantiate the Sub-Agents demo supervisor.""" # Build (and cache) the three sub-agents so the @tool entry points # can find them via the module-level registry. _SUB_AGENTS["research_agent"] = _make_sub_agent( chat_client, "research_agent", _RESEARCH_INSTRUCTIONS ) _SUB_AGENTS["writing_agent"] = _make_sub_agent( chat_client, "writing_agent", _WRITING_INSTRUCTIONS ) _SUB_AGENTS["critique_agent"] = _make_sub_agent( chat_client, "critique_agent", _CRITIQUE_INSTRUCTIONS ) base_agent = Agent( client=chat_client, name="subagents_supervisor", instructions=SUPERVISOR_PROMPT, tools=[research_agent, writing_agent, critique_agent], default_options={"allow_multiple_tool_calls": False}, middleware=[capture_current_state], ) return SubagentsFrameworkAgent( agent=base_agent, name="CopilotKitMSAgentSubagentsSupervisor", description=( "Supervisor agent. Delegates research / writing / critique " "to specialized sub-agents and surfaces the live " "delegation log to the UI via shared state." ), state_schema=STATE_SCHEMA, require_confirmation=False, )What is this?#
Sub-agents are the canonical multi-agent pattern: a top-level supervisor LLM orchestrates one or more specialized sub-agents by exposing each of them as a tool. The supervisor decides what to delegate, the sub-agents do their narrow job, and their results flow back up to the supervisor's next step.
This is fundamentally the same shape as tool-calling, but each "tool" is itself a full-blown agent with its own system prompt and (often) its own tools, memory, and model.
When should I use this?#
Reach for sub-agents when a task has distinct specialized sub-tasks that each benefit from their own focus:
- Research → Write → Critique pipelines, where each stage needs a different system prompt and temperature.
- Router + specialists, where one agent classifies the request and dispatches to the right expert.
- Divide-and-conquer — any problem that fits cleanly into parallel or sequential sub-problems.
The example below uses the Research → Write → Critique shape as the canonical example.
Setting up sub-agents#
Each sub-agent is a full create_agent(...) call with its own model,
its own system prompt, and (optionally) its own tools. They don't share
memory or tools with the supervisor; the supervisor only ever sees
what the sub-agent returns.
from __future__ import annotationsimport asyncioimport contextvarsimport jsonimport loggingimport threadingimport uuidfrom collections.abc import AsyncGenerator, Awaitable, Callablefrom textwrap import dedentfrom typing import Annotated, Anyfrom ag_ui.core import BaseEventfrom agent_framework import ( Agent, AgentContext, BaseChatClient, Content, agent_middleware, tool,)from agent_framework_ag_ui import AgentFrameworkAgent, state_updatefrom pydantic import Fieldlogger = logging.getLogger(__name__)# ---------------------------------------------------------------------------# State schema — `delegations` is rendered as a live log in the UI.# ---------------------------------------------------------------------------STATE_SCHEMA: dict[str, object] = { "delegations": { "type": "array", "description": ( "Append-only log of supervisor -> sub-agent delegations. " "Each entry is a Delegation = " "{id, sub_agent, task, status, result}." ), "items": { "type": "object", "properties": { "id": {"type": "string"}, "sub_agent": {"type": "string"}, "task": {"type": "string"}, "status": {"type": "string"}, "result": {"type": "string"}, }, }, }}# ---------------------------------------------------------------------------# Per-request current_state bridge## Tools cannot directly receive `current_state` from the AG-UI runtime,# but `agent_middleware` runs once per agent invocation with full# session context. We snapshot the latest `delegations` list into a# ContextVar before `call_next()`, so each delegation tool (running in# the same task / contextvar scope) can read it back, append, and# return the FULL list via `state_update`.# ---------------------------------------------------------------------------_current_delegations: contextvars.ContextVar[list[dict[str, Any]]] = ( contextvars.ContextVar("ms_subagents_current_delegations", default=[]))def _extract_delegations(raw: Any) -> list[dict[str, Any]]: """Pull a clean delegations list out of session metadata. `session.metadata["current_state"]` is JSON-serialized by the AG-UI runtime (see `_build_safe_metadata`) so we tolerate either a plain dict or its string form. """ payload: Any = raw if isinstance(payload, str): try: payload = json.loads(payload) except json.JSONDecodeError: logger.warning( "subagents: current_state was not valid JSON; " "starting from empty delegations list" ) return [] if not isinstance(payload, dict): return [] delegations = payload.get("delegations") if not isinstance(delegations, list): return [] return [dict(d) for d in delegations if isinstance(d, dict)]@agent_middlewareasync def capture_current_state( context: AgentContext, call_next: Callable[[], Awaitable[None]]) -> None: """Snapshot `delegations` from session metadata into a ContextVar.""" snapshot: list[dict[str, Any]] = [] session = context.session metadata = getattr(session, "metadata", None) if session else None if isinstance(metadata, dict): snapshot = _extract_delegations(metadata.get("current_state")) token = _current_delegations.set(snapshot) try: await call_next() finally: _current_delegations.reset(token)# ---------------------------------------------------------------------------# Sub-agent factory## Each sub-agent is a full `Agent(...)` with its own system prompt.# They share the chat client with the supervisor but otherwise have no# shared memory or tools — the supervisor only sees their final text.# ---------------------------------------------------------------------------# Each sub-agent is a full-fledged `Agent(...)` with its own system# prompt. They don't share memory or tools with the supervisor — the# supervisor only sees their return value (final text content)._RESEARCH_INSTRUCTIONS = ( "You are a research sub-agent. Given a topic, produce a concise " "bulleted list of 3-5 key facts. No preamble, no closing.")_WRITING_INSTRUCTIONS = ( "You are a writing sub-agent. Given a brief and optional source " "facts, produce a polished 1-paragraph draft. Be clear and " "concrete. No preamble.")_CRITIQUE_INSTRUCTIONS = ( "You are an editorial critique sub-agent. Given a draft, give " "2-3 crisp, actionable critiques. No preamble.")def _make_sub_agent(chat_client: BaseChatClient, name: str, instructions: str) -> Agent: return Agent( client=chat_client, name=name, instructions=instructions, tools=[], )Keep sub-agent system prompts narrow and focused. The point of this pattern is that each one does one thing well. If a sub-agent needs to know the whole user context to do its job, that's a signal the boundary is wrong.
Exposing sub-agents as tools#
The supervisor delegates by calling tools. Each tool is a thin wrapper
around sub_agent.invoke(...) that:
- Runs the sub-agent synchronously on the supplied
taskstring. - Records the delegation into a
delegationsslot in shared agent state (so the UI can render a live log). - Returns the sub-agent's final message as a
ToolMessage, which the supervisor sees as a normal tool result on its next turn.
from __future__ import annotationsimport asyncioimport contextvarsimport jsonimport loggingimport threadingimport uuidfrom collections.abc import AsyncGenerator, Awaitable, Callablefrom textwrap import dedentfrom typing import Annotated, Anyfrom ag_ui.core import BaseEventfrom agent_framework import ( Agent, AgentContext, BaseChatClient, Content, agent_middleware, tool,)from agent_framework_ag_ui import AgentFrameworkAgent, state_updatefrom pydantic import Fieldlogger = logging.getLogger(__name__)# ---------------------------------------------------------------------------# State schema — `delegations` is rendered as a live log in the UI.# ---------------------------------------------------------------------------STATE_SCHEMA: dict[str, object] = { "delegations": { "type": "array", "description": ( "Append-only log of supervisor -> sub-agent delegations. " "Each entry is a Delegation = " "{id, sub_agent, task, status, result}." ), "items": { "type": "object", "properties": { "id": {"type": "string"}, "sub_agent": {"type": "string"}, "task": {"type": "string"}, "status": {"type": "string"}, "result": {"type": "string"}, }, }, }}# ---------------------------------------------------------------------------# Per-request current_state bridge## Tools cannot directly receive `current_state` from the AG-UI runtime,# but `agent_middleware` runs once per agent invocation with full# session context. We snapshot the latest `delegations` list into a# ContextVar before `call_next()`, so each delegation tool (running in# the same task / contextvar scope) can read it back, append, and# return the FULL list via `state_update`.# ---------------------------------------------------------------------------_current_delegations: contextvars.ContextVar[list[dict[str, Any]]] = ( contextvars.ContextVar("ms_subagents_current_delegations", default=[]))def _extract_delegations(raw: Any) -> list[dict[str, Any]]: """Pull a clean delegations list out of session metadata. `session.metadata["current_state"]` is JSON-serialized by the AG-UI runtime (see `_build_safe_metadata`) so we tolerate either a plain dict or its string form. """ payload: Any = raw if isinstance(payload, str): try: payload = json.loads(payload) except json.JSONDecodeError: logger.warning( "subagents: current_state was not valid JSON; " "starting from empty delegations list" ) return [] if not isinstance(payload, dict): return [] delegations = payload.get("delegations") if not isinstance(delegations, list): return [] return [dict(d) for d in delegations if isinstance(d, dict)]@agent_middlewareasync def capture_current_state( context: AgentContext, call_next: Callable[[], Awaitable[None]]) -> None: """Snapshot `delegations` from session metadata into a ContextVar.""" snapshot: list[dict[str, Any]] = [] session = context.session metadata = getattr(session, "metadata", None) if session else None if isinstance(metadata, dict): snapshot = _extract_delegations(metadata.get("current_state")) token = _current_delegations.set(snapshot) try: await call_next() finally: _current_delegations.reset(token)# ---------------------------------------------------------------------------# Sub-agent factory## Each sub-agent is a full `Agent(...)` with its own system prompt.# They share the chat client with the supervisor but otherwise have no# shared memory or tools — the supervisor only sees their final text.# ---------------------------------------------------------------------------# Each sub-agent is a full-fledged `Agent(...)` with its own system# prompt. They don't share memory or tools with the supervisor — the# supervisor only sees their return value (final text content)._RESEARCH_INSTRUCTIONS = ( "You are a research sub-agent. Given a topic, produce a concise " "bulleted list of 3-5 key facts. No preamble, no closing.")_WRITING_INSTRUCTIONS = ( "You are a writing sub-agent. Given a brief and optional source " "facts, produce a polished 1-paragraph draft. Be clear and " "concrete. No preamble.")_CRITIQUE_INSTRUCTIONS = ( "You are an editorial critique sub-agent. Given a draft, give " "2-3 crisp, actionable critiques. No preamble.")def _make_sub_agent(chat_client: BaseChatClient, name: str, instructions: str) -> Agent: return Agent( client=chat_client, name=name, instructions=instructions, tools=[], )# Module-level holder so the delegation tools can reach the# pre-built sub-agents without rebuilding them on every tool call.# Populated lazily by `create_subagents_agent(...)`._SUB_AGENTS: dict[str, Agent] = {}async def _invoke_sub_agent_async(sub_agent_name: str, task: str) -> str: """Run a sub-agent on `task` and return its final text content.""" agent = _SUB_AGENTS.get(sub_agent_name) if agent is None: raise RuntimeError( f"sub-agent '{sub_agent_name}' is not registered; call " "create_subagents_agent(...) first" ) response = await agent.run(task) text = (getattr(response, "text", "") or "").strip() if text: return text # Fall back to scanning messages — `Agent.run` always returns # an `AgentRunResponse`, but `.text` may be empty if the chat # client only emitted reasoning content or tool calls. messages = getattr(response, "messages", None) or [] for message in reversed(messages): for content in getattr(message, "contents", None) or []: content_text = getattr(content, "text", None) if content_text: fallback = str(content_text).strip() if fallback: return fallback raise RuntimeError(f"sub-agent '{sub_agent_name}' returned no text content")def _invoke_sub_agent(sub_agent_name: str, task: str) -> str: """Sync bridge: drive the async invocation from inside a tool callback. `@tool` reflects on the underlying callable's signature, so the tool entry points are sync. The supervisor's chat client typically runs inside an existing event loop (FastAPI request handler), so `asyncio.run` would refuse — fall through to a worker thread that spins up its own loop. """ try: return asyncio.run(_invoke_sub_agent_async(sub_agent_name, task)) except RuntimeError as exc: if "asyncio.run() cannot be called" not in str(exc): raise container: dict[str, Any] = {} def _runner() -> None: try: container["result"] = asyncio.run( _invoke_sub_agent_async(sub_agent_name, task) ) except Exception as inner: # pragma: no cover -- defensive container["error"] = inner worker = threading.Thread(target=_runner, daemon=True) worker.start() worker.join() if "error" in container: raise container["error"] return str(container["result"])def _delegate(sub_agent_name: str, task: str) -> Content: """Common delegation flow: invoke sub-agent, append entry, push state.""" delegations = list(_current_delegations.get()) entry_id = str(uuid.uuid4()) try: result_text = _invoke_sub_agent(sub_agent_name, task) except Exception as exc: logger.exception("subagents: %s delegation failed", sub_agent_name) delegations.append( { "id": entry_id, "sub_agent": sub_agent_name, "task": task, "status": "failed", # Surface only the exception class — sub-agent error # messages can leak chat client URLs / quota details # in deployed environments. "result": (f"sub-agent error: {exc.__class__.__name__}"), } ) # Mirror the contextvar so a follow-up sub-agent call within the # same supervisor turn sees this entry. _current_delegations.set(delegations) return state_update( text=(f"{sub_agent_name} failed; surfaced in delegation log."), state={"delegations": delegations}, ) delegations.append( { "id": entry_id, "sub_agent": sub_agent_name, "task": task, "status": "completed", "result": result_text, } ) _current_delegations.set(delegations) return state_update( text=result_text, state={"delegations": delegations}, )# ---------------------------------------------------------------------------# Supervisor delegation tools — each one wraps a sub-agent invocation.# ---------------------------------------------------------------------------# Each @tool wraps a sub-agent invocation. The supervisor LLM "calls"# these tools to delegate work; each call synchronously runs the# matching sub-agent (via `_delegate`), appends the entry to the# `delegations` shared-state slot, and returns a `state_update(...)` so# the AG-UI emitter pushes a deterministic StateSnapshotEvent — both# surfacing the result to the supervisor and refreshing the live# delegation log in the UI.@tool( name="research_agent", description=( "Delegate a research task to the research sub-agent. Use for " "gathering facts, background, definitions, statistics. Returns " "a bulleted list of key facts." ),)def research_agent( task: Annotated[ str, Field(description="The research question or topic to investigate."), ],) -> Content: """Delegate a research task to the research sub-agent.""" return _delegate("research_agent", task)@tool( name="writing_agent", description=( "Delegate a drafting task to the writing sub-agent. Use for " "producing a polished paragraph, draft, or summary. Pass any " "relevant facts from prior research inside `task`." ),)def writing_agent( task: Annotated[ str, Field( description=( "The drafting brief, including any relevant source " "facts the writer should weave in." ) ), ],) -> Content: """Delegate a drafting task to the writing sub-agent.""" return _delegate("writing_agent", task)@tool( name="critique_agent", description=( "Delegate a critique task to the critique sub-agent. Use for " "reviewing a draft and suggesting concrete improvements." ),)def critique_agent( task: Annotated[ str, Field( description=( "The draft text to critique. Provide the full text -- " "the critique sub-agent has no other context." ) ), ],) -> Content: """Delegate a critique task to the critique sub-agent.""" return _delegate("critique_agent", task)This is where CopilotKit's shared-state channel earns its keep: the
supervisor's tool calls mutate delegations as they happen, and the
frontend renders every new entry live.
Rendering a live delegation log#
On the frontend, the delegation log is just a reactive render of the
delegations slot. Subscribe with useAgent({ updates: [UseAgentUpdate.OnStateChanged, UseAgentUpdate.OnRunStatusChanged] }),
read agent.state.delegations, and render one card per entry.
/** * Live delegation log — renders the `delegations` slot of agent state. * * Each entry corresponds to one invocation of a sub-agent. The list * grows in real time as the supervisor fans work out to its children. * The parent header shows how many sub-agents have been called and * whether the supervisor is still running. */// Fixed list of the three sub-agent roles the supervisor can call.// Rendered as always-visible indicator chips at the top of the log// (regardless of whether the supervisor has delegated yet) so the user// — and the e2e suite — can see at a glance which sub-agents exist and// which are currently active.const INDICATOR_ROLES: ReadonlyArray<{ role: "researcher" | "writer" | "critic"; subAgent: SubAgentName;}> = [ { role: "researcher", subAgent: "research_agent" }, { role: "writer", subAgent: "writing_agent" }, { role: "critic", subAgent: "critique_agent" },];export function DelegationLog({ delegations, isRunning }: DelegationLogProps) { const calledRoles = new Set<SubAgentName>( delegations.map((d) => d.sub_agent), ); return ( <div data-testid="delegation-log" className="w-full h-full flex flex-col bg-white rounded-2xl shadow-sm border border-[#DBDBE5] overflow-hidden" > <div className="flex items-center justify-between px-6 py-3 border-b border-[#E9E9EF] bg-[#FAFAFC]"> <div className="flex items-center gap-3"> <span className="text-lg font-semibold text-[#010507]"> Sub-agent delegations </span> {isRunning && ( <span data-testid="supervisor-running" className="inline-flex items-center gap-1.5 px-2 py-0.5 rounded-full border border-[#BEC2FF] bg-[#BEC2FF1A] text-[#010507] text-[10px] font-semibold uppercase tracking-[0.12em]" > <span className="w-1.5 h-1.5 rounded-full bg-[#010507] animate-pulse" /> Supervisor running </span> )} </div> <span data-testid="delegation-count" className="text-xs font-mono text-[#838389]" > {delegations.length} calls </span> </div> <div data-testid="subagent-indicators" className="flex items-center gap-2 border-b border-[#E9E9EF] bg-white px-6 py-2" > {INDICATOR_ROLES.map(({ role, subAgent }) => { const style = SUB_AGENT_STYLE[subAgent]; const fired = calledRoles.has(subAgent); return ( <span key={role} data-testid={`subagent-indicator-${role}`} data-role={role} data-fired={fired ? "true" : "false"} className={`inline-flex items-center gap-1 px-2 py-0.5 rounded-full text-[10px] font-semibold uppercase tracking-[0.1em] border ${style.color} ${ fired ? "" : "opacity-60" }`} > <span aria-hidden>{style.emoji}</span> <span>{style.label}</span> </span> ); })} </div> <div className="flex-1 overflow-y-auto p-4 space-y-3"> {delegations.length === 0 ? ( <p className="text-[#838389] italic text-sm"> Ask the supervisor to complete a task. Every sub-agent it calls will appear here. </p> ) : ( delegations.map((d, idx) => { const style = SUB_AGENT_STYLE[d.sub_agent]; return ( <div key={d.id} data-testid="delegation-entry" className="border border-[#E9E9EF] rounded-xl p-3 bg-[#FAFAFC]" > <div className="flex items-center justify-between mb-2"> <div className="flex items-center gap-2"> <span className="text-xs font-mono text-[#AFAFB7]"> #{idx + 1} </span> <span className={`inline-flex items-center gap-1 px-2 py-0.5 rounded-full text-[10px] font-semibold uppercase tracking-[0.1em] border ${style.color}`} > <span>{style.emoji}</span> <span>{style.label}</span> </span> </div> <span className="text-[10px] uppercase tracking-[0.12em] font-semibold text-[#189370]"> {d.status} </span> </div> <div className="text-xs text-[#57575B] mb-2"> <span className="font-semibold text-[#010507]">Task: </span> {d.task} </div> <div className="text-sm text-[#010507] whitespace-pre-wrap bg-white rounded-lg p-2.5 border border-[#E9E9EF]"> {d.result} </div> </div> ); }) )} </div> </div> );}The result: as the supervisor fans work out to its sub-agents, the log grows in real time, giving the user visibility into a process that would otherwise be a long opaque spinner.
Related#
- Shared State — the channel that makes the delegation log live.
- State streaming — stream individual sub-agent outputs token-by-token inside each log entry.
