CopilotKit

Sub-Agents

Decompose work across multiple specialized agents with a visible delegation log.


"""MS Agent Framework agent backing the Sub-Agents demo.Mirrors langgraph-python/src/agents/subagents.py andgoogle-adk/src/agents/subagents_agent.py:A top-level supervisor LLM orchestrates three specialized sub-agentsexposed as tools:  - `research_agent` — gathers facts  - `writing_agent`  — drafts prose  - `critique_agent` — reviews draftsEach sub-agent is a real `agent_framework.Agent` with its own systemprompt. Each delegation appends an entry to the `delegations` slot inAG-UI shared state via `state_update(...)`, so the UI can render alive delegation log via `useAgent`.Subagent invocation contract: each delegation tool returns`state_update(...)` containing the FULL updated `delegations` list. Weread the prior list out of a per-request `ContextVar` populated by an`agent_middleware` that captures the AG-UI session metadata(specifically `current_state`, which the AG-UI runtime stuffs into`session.metadata` on every turn) before the supervisor runs."""from __future__ import annotationsimport asyncioimport contextvarsimport jsonimport loggingimport threadingimport uuidfrom collections.abc import AsyncGenerator, Awaitable, Callablefrom textwrap import dedentfrom typing import Annotated, Anyfrom ag_ui.core import BaseEventfrom agent_framework import (    Agent,    AgentContext,    BaseChatClient,    Content,    agent_middleware,    tool,)from agent_framework_ag_ui import AgentFrameworkAgent, state_updatefrom pydantic import Fieldlogger = logging.getLogger(__name__)# ---------------------------------------------------------------------------# State schema — `delegations` is rendered as a live log in the UI.# ---------------------------------------------------------------------------STATE_SCHEMA: dict[str, object] = {    "delegations": {        "type": "array",        "description": (            "Append-only log of supervisor -> sub-agent delegations. "            "Each entry is a Delegation = "            "{id, sub_agent, task, status, result}."        ),        "items": {            "type": "object",            "properties": {                "id": {"type": "string"},                "sub_agent": {"type": "string"},                "task": {"type": "string"},                "status": {"type": "string"},                "result": {"type": "string"},            },        },    }}# ---------------------------------------------------------------------------# Per-request current_state bridge## Tools cannot directly receive `current_state` from the AG-UI runtime,# but `agent_middleware` runs once per agent invocation with full# session context. We snapshot the latest `delegations` list into a# ContextVar before `call_next()`, so each delegation tool (running in# the same task / contextvar scope) can read it back, append, and# return the FULL list via `state_update`.# ---------------------------------------------------------------------------_current_delegations: contextvars.ContextVar[list[dict[str, Any]]] = (    contextvars.ContextVar("ms_subagents_current_delegations", default=[]))def _extract_delegations(raw: Any) -> list[dict[str, Any]]:    """Pull a clean delegations list out of session metadata.    `session.metadata["current_state"]` is JSON-serialized by the    AG-UI runtime (see `_build_safe_metadata`) so we tolerate either    a plain dict or its string form.    """    payload: Any = raw    if isinstance(payload, str):        try:            payload = json.loads(payload)        except json.JSONDecodeError:            logger.warning(                "subagents: current_state was not valid JSON; "                "starting from empty delegations list"            )            return []    if not isinstance(payload, dict):        return []    delegations = payload.get("delegations")    if not isinstance(delegations, list):        return []    return [dict(d) for d in delegations if isinstance(d, dict)]@agent_middlewareasync def capture_current_state(    context: AgentContext, call_next: Callable[[], Awaitable[None]]) -> None:    """Snapshot `delegations` from session metadata into a ContextVar."""    snapshot: list[dict[str, Any]] = []    session = context.session    metadata = getattr(session, "metadata", None) if session else None    if isinstance(metadata, dict):        snapshot = _extract_delegations(metadata.get("current_state"))    token = _current_delegations.set(snapshot)    try:        await call_next()    finally:        _current_delegations.reset(token)# ---------------------------------------------------------------------------# Sub-agent factory## Each sub-agent is a full `Agent(...)` with its own system prompt.# They share the chat client with the supervisor but otherwise have no# shared memory or tools — the supervisor only sees their final text.# ---------------------------------------------------------------------------# Each sub-agent is a full-fledged `Agent(...)` with its own system# prompt. They don't share memory or tools with the supervisor — the# supervisor only sees their return value (final text content)._RESEARCH_INSTRUCTIONS = (    "You are a research sub-agent. Given a topic, produce a concise "    "bulleted list of 3-5 key facts. No preamble, no closing.")_WRITING_INSTRUCTIONS = (    "You are a writing sub-agent. Given a brief and optional source "    "facts, produce a polished 1-paragraph draft. Be clear and "    "concrete. No preamble.")_CRITIQUE_INSTRUCTIONS = (    "You are an editorial critique sub-agent. Given a draft, give "    "2-3 crisp, actionable critiques. No preamble.")def _make_sub_agent(chat_client: BaseChatClient, name: str, instructions: str) -> Agent:    return Agent(        client=chat_client,        name=name,        instructions=instructions,        tools=[],    )# Module-level holder so the delegation tools can reach the# pre-built sub-agents without rebuilding them on every tool call.# Populated lazily by `create_subagents_agent(...)`._SUB_AGENTS: dict[str, Agent] = {}async def _invoke_sub_agent_async(sub_agent_name: str, task: str) -> str:    """Run a sub-agent on `task` and return its final text content."""    agent = _SUB_AGENTS.get(sub_agent_name)    if agent is None:        raise RuntimeError(            f"sub-agent '{sub_agent_name}' is not registered; call "            "create_subagents_agent(...) first"        )    response = await agent.run(task)    text = (getattr(response, "text", "") or "").strip()    if text:        return text    # Fall back to scanning messages — `Agent.run` always returns    # an `AgentRunResponse`, but `.text` may be empty if the chat    # client only emitted reasoning content or tool calls.    messages = getattr(response, "messages", None) or []    for message in reversed(messages):        for content in getattr(message, "contents", None) or []:            content_text = getattr(content, "text", None)            if content_text:                fallback = str(content_text).strip()                if fallback:                    return fallback    raise RuntimeError(f"sub-agent '{sub_agent_name}' returned no text content")def _invoke_sub_agent(sub_agent_name: str, task: str) -> str:    """Sync bridge: drive the async invocation from inside a tool callback.    `@tool` reflects on the underlying callable's signature, so the    tool entry points are sync. The supervisor's chat client typically    runs inside an existing event loop (FastAPI request handler), so    `asyncio.run` would refuse — fall through to a worker thread that    spins up its own loop.    """    try:        return asyncio.run(_invoke_sub_agent_async(sub_agent_name, task))    except RuntimeError as exc:        if "asyncio.run() cannot be called" not in str(exc):            raise    container: dict[str, Any] = {}    def _runner() -> None:        try:            container["result"] = asyncio.run(                _invoke_sub_agent_async(sub_agent_name, task)            )        except Exception as inner:  # pragma: no cover -- defensive            container["error"] = inner    worker = threading.Thread(target=_runner, daemon=True)    worker.start()    worker.join()    if "error" in container:        raise container["error"]    return str(container["result"])def _delegate(sub_agent_name: str, task: str) -> Content:    """Common delegation flow: invoke sub-agent, append entry, push state."""    delegations = list(_current_delegations.get())    entry_id = str(uuid.uuid4())    try:        result_text = _invoke_sub_agent(sub_agent_name, task)    except Exception as exc:        logger.exception("subagents: %s delegation failed", sub_agent_name)        delegations.append(            {                "id": entry_id,                "sub_agent": sub_agent_name,                "task": task,                "status": "failed",                # Surface only the exception class — sub-agent error                # messages can leak chat client URLs / quota details                # in deployed environments.                "result": (f"sub-agent error: {exc.__class__.__name__}"),            }        )        # Mirror the contextvar so a follow-up sub-agent call within the        # same supervisor turn sees this entry.        _current_delegations.set(delegations)        return state_update(            text=(f"{sub_agent_name} failed; surfaced in delegation log."),            state={"delegations": delegations},        )    delegations.append(        {            "id": entry_id,            "sub_agent": sub_agent_name,            "task": task,            "status": "completed",            "result": result_text,        }    )    _current_delegations.set(delegations)    return state_update(        text=result_text,        state={"delegations": delegations},    )# ---------------------------------------------------------------------------# Supervisor delegation tools — each one wraps a sub-agent invocation.# ---------------------------------------------------------------------------# Each @tool wraps a sub-agent invocation. The supervisor LLM "calls"# these tools to delegate work; each call synchronously runs the# matching sub-agent (via `_delegate`), appends the entry to the# `delegations` shared-state slot, and returns a `state_update(...)` so# the AG-UI emitter pushes a deterministic StateSnapshotEvent — both# surfacing the result to the supervisor and refreshing the live# delegation log in the UI.@tool(    name="research_agent",    description=(        "Delegate a research task to the research sub-agent. Use for "        "gathering facts, background, definitions, statistics. Returns "        "a bulleted list of key facts."    ),)def research_agent(    task: Annotated[        str,        Field(description="The research question or topic to investigate."),    ],) -> Content:    """Delegate a research task to the research sub-agent."""    return _delegate("research_agent", task)@tool(    name="writing_agent",    description=(        "Delegate a drafting task to the writing sub-agent. Use for "        "producing a polished paragraph, draft, or summary. Pass any "        "relevant facts from prior research inside `task`."    ),)def writing_agent(    task: Annotated[        str,        Field(            description=(                "The drafting brief, including any relevant source "                "facts the writer should weave in."            )        ),    ],) -> Content:    """Delegate a drafting task to the writing sub-agent."""    return _delegate("writing_agent", task)@tool(    name="critique_agent",    description=(        "Delegate a critique task to the critique sub-agent. Use for "        "reviewing a draft and suggesting concrete improvements."    ),)def critique_agent(    task: Annotated[        str,        Field(            description=(                "The draft text to critique. Provide the full text -- "                "the critique sub-agent has no other context."            )        ),    ],) -> Content:    """Delegate a critique task to the critique sub-agent."""    return _delegate("critique_agent", task)# ---------------------------------------------------------------------------# Supervisor agent factory# ---------------------------------------------------------------------------SUPERVISOR_PROMPT = dedent(    """    You are a supervisor agent that coordinates three specialized    sub-agents to produce high-quality deliverables.    Available sub-agents (call them as tools):      - research_agent: gathers facts on a topic.      - writing_agent:  turns facts + a brief into a polished draft.      - critique_agent: reviews a draft and suggests improvements.    For every non-trivial user request, delegate in sequence:    research_agent -> writing_agent -> critique_agent.    IMPORTANT: call EACH sub-agent EXACTLY ONCE per user request.    After critique_agent returns, do NOT call any sub-agent again -- return    a concise final answer to the user that incorporates the critique.    Pass the relevant facts/draft through the `task` argument of each tool.    Keep your own messages short — explain the plan once, delegate,    then return a concise summary once done. The UI shows the user a    live log of every sub-agent delegation.    """).strip()def _tool_call_ids(message: dict[str, Any]) -> set[str]:    tool_calls = message.get("tool_calls") or message.get("toolCalls") or []    if not isinstance(tool_calls, list):        return set()    ids: set[str] = set()    for call in tool_calls:        if isinstance(call, dict) and isinstance(call.get("id"), str):            ids.add(call["id"])    return idsdef _tool_result_ids(messages: list[dict[str, Any]], start_index: int) -> set[str]:    ids: set[str] = set()    for message in messages[start_index + 1 :]:        if message.get("role") == "user":            break        if message.get("role") != "tool":            continue        call_id = message.get("tool_call_id") or message.get("toolCallId")        if isinstance(call_id, str):            ids.add(call_id)    return idsdef _drop_orphan_assistant_tool_calls(messages: Any) -> list[dict[str, Any]]:    """Remove historical assistant tool calls that lack tool result messages.    The MS Agent Framework AG-UI bridge can preserve the assistant tool-call    snapshot while omitting the corresponding tool-role results. OpenAI rejects    that history on the next turn, so keep the final assistant text/state but    omit malformed historical tool-call entries before the supervisor runs.    """    if not isinstance(messages, list):        return []    clean: list[dict[str, Any]] = []    for index, message in enumerate(messages):        if not isinstance(message, dict):            continue        if message.get("role") == "assistant":            call_ids = _tool_call_ids(message)            if call_ids and not call_ids.issubset(_tool_result_ids(messages, index)):                continue        clean.append(message)    return cleanclass SubagentsFrameworkAgent(AgentFrameworkAgent):    """AgentFrameworkAgent that removes invalid historical tool-call snapshots."""    async def run(  # type: ignore[override]        self,        input_data: dict[str, Any],    ) -> AsyncGenerator[BaseEvent, None]:        patched_input = dict(input_data)        patched_input["messages"] = _drop_orphan_assistant_tool_calls(            input_data.get("messages")        )        async for event in super().run(patched_input):            yield eventdef create_subagents_agent(chat_client: BaseChatClient) -> SubagentsFrameworkAgent:    """Instantiate the Sub-Agents demo supervisor."""    # Build (and cache) the three sub-agents so the @tool entry points    # can find them via the module-level registry.    _SUB_AGENTS["research_agent"] = _make_sub_agent(        chat_client, "research_agent", _RESEARCH_INSTRUCTIONS    )    _SUB_AGENTS["writing_agent"] = _make_sub_agent(        chat_client, "writing_agent", _WRITING_INSTRUCTIONS    )    _SUB_AGENTS["critique_agent"] = _make_sub_agent(        chat_client, "critique_agent", _CRITIQUE_INSTRUCTIONS    )    base_agent = Agent(        client=chat_client,        name="subagents_supervisor",        instructions=SUPERVISOR_PROMPT,        tools=[research_agent, writing_agent, critique_agent],        default_options={"allow_multiple_tool_calls": False},        middleware=[capture_current_state],    )    return SubagentsFrameworkAgent(        agent=base_agent,        name="CopilotKitMSAgentSubagentsSupervisor",        description=(            "Supervisor agent. Delegates research / writing / critique "            "to specialized sub-agents and surfaces the live "            "delegation log to the UI via shared state."        ),        state_schema=STATE_SCHEMA,        require_confirmation=False,    )

What is this?#

Sub-agents are the canonical multi-agent pattern: a top-level supervisor LLM orchestrates one or more specialized sub-agents by exposing each of them as a tool. The supervisor decides what to delegate, the sub-agents do their narrow job, and their results flow back up to the supervisor's next step.

This is fundamentally the same shape as tool-calling, but each "tool" is itself a full-blown agent with its own system prompt and (often) its own tools, memory, and model.

When should I use this?#

Reach for sub-agents when a task has distinct specialized sub-tasks that each benefit from their own focus:

  • Research → Write → Critique pipelines, where each stage needs a different system prompt and temperature.
  • Router + specialists, where one agent classifies the request and dispatches to the right expert.
  • Divide-and-conquer — any problem that fits cleanly into parallel or sequential sub-problems.

The example below uses the Research → Write → Critique shape as the canonical example.

Setting up sub-agents#

Each sub-agent is a full create_agent(...) call with its own model, its own system prompt, and (optionally) its own tools. They don't share memory or tools with the supervisor; the supervisor only ever sees what the sub-agent returns.

subagents_agent.py
from __future__ import annotationsimport asyncioimport contextvarsimport jsonimport loggingimport threadingimport uuidfrom collections.abc import AsyncGenerator, Awaitable, Callablefrom textwrap import dedentfrom typing import Annotated, Anyfrom ag_ui.core import BaseEventfrom agent_framework import (    Agent,    AgentContext,    BaseChatClient,    Content,    agent_middleware,    tool,)from agent_framework_ag_ui import AgentFrameworkAgent, state_updatefrom pydantic import Fieldlogger = logging.getLogger(__name__)# ---------------------------------------------------------------------------# State schema — `delegations` is rendered as a live log in the UI.# ---------------------------------------------------------------------------STATE_SCHEMA: dict[str, object] = {    "delegations": {        "type": "array",        "description": (            "Append-only log of supervisor -> sub-agent delegations. "            "Each entry is a Delegation = "            "{id, sub_agent, task, status, result}."        ),        "items": {            "type": "object",            "properties": {                "id": {"type": "string"},                "sub_agent": {"type": "string"},                "task": {"type": "string"},                "status": {"type": "string"},                "result": {"type": "string"},            },        },    }}# ---------------------------------------------------------------------------# Per-request current_state bridge## Tools cannot directly receive `current_state` from the AG-UI runtime,# but `agent_middleware` runs once per agent invocation with full# session context. We snapshot the latest `delegations` list into a# ContextVar before `call_next()`, so each delegation tool (running in# the same task / contextvar scope) can read it back, append, and# return the FULL list via `state_update`.# ---------------------------------------------------------------------------_current_delegations: contextvars.ContextVar[list[dict[str, Any]]] = (    contextvars.ContextVar("ms_subagents_current_delegations", default=[]))def _extract_delegations(raw: Any) -> list[dict[str, Any]]:    """Pull a clean delegations list out of session metadata.    `session.metadata["current_state"]` is JSON-serialized by the    AG-UI runtime (see `_build_safe_metadata`) so we tolerate either    a plain dict or its string form.    """    payload: Any = raw    if isinstance(payload, str):        try:            payload = json.loads(payload)        except json.JSONDecodeError:            logger.warning(                "subagents: current_state was not valid JSON; "                "starting from empty delegations list"            )            return []    if not isinstance(payload, dict):        return []    delegations = payload.get("delegations")    if not isinstance(delegations, list):        return []    return [dict(d) for d in delegations if isinstance(d, dict)]@agent_middlewareasync def capture_current_state(    context: AgentContext, call_next: Callable[[], Awaitable[None]]) -> None:    """Snapshot `delegations` from session metadata into a ContextVar."""    snapshot: list[dict[str, Any]] = []    session = context.session    metadata = getattr(session, "metadata", None) if session else None    if isinstance(metadata, dict):        snapshot = _extract_delegations(metadata.get("current_state"))    token = _current_delegations.set(snapshot)    try:        await call_next()    finally:        _current_delegations.reset(token)# ---------------------------------------------------------------------------# Sub-agent factory## Each sub-agent is a full `Agent(...)` with its own system prompt.# They share the chat client with the supervisor but otherwise have no# shared memory or tools — the supervisor only sees their final text.# ---------------------------------------------------------------------------# Each sub-agent is a full-fledged `Agent(...)` with its own system# prompt. They don't share memory or tools with the supervisor — the# supervisor only sees their return value (final text content)._RESEARCH_INSTRUCTIONS = (    "You are a research sub-agent. Given a topic, produce a concise "    "bulleted list of 3-5 key facts. No preamble, no closing.")_WRITING_INSTRUCTIONS = (    "You are a writing sub-agent. Given a brief and optional source "    "facts, produce a polished 1-paragraph draft. Be clear and "    "concrete. No preamble.")_CRITIQUE_INSTRUCTIONS = (    "You are an editorial critique sub-agent. Given a draft, give "    "2-3 crisp, actionable critiques. No preamble.")def _make_sub_agent(chat_client: BaseChatClient, name: str, instructions: str) -> Agent:    return Agent(        client=chat_client,        name=name,        instructions=instructions,        tools=[],    )

Keep sub-agent system prompts narrow and focused. The point of this pattern is that each one does one thing well. If a sub-agent needs to know the whole user context to do its job, that's a signal the boundary is wrong.

Exposing sub-agents as tools#

The supervisor delegates by calling tools. Each tool is a thin wrapper around sub_agent.invoke(...) that:

  1. Runs the sub-agent synchronously on the supplied task string.
  2. Records the delegation into a delegations slot in shared agent state (so the UI can render a live log).
  3. Returns the sub-agent's final message as a ToolMessage, which the supervisor sees as a normal tool result on its next turn.
subagents_agent.py
from __future__ import annotationsimport asyncioimport contextvarsimport jsonimport loggingimport threadingimport uuidfrom collections.abc import AsyncGenerator, Awaitable, Callablefrom textwrap import dedentfrom typing import Annotated, Anyfrom ag_ui.core import BaseEventfrom agent_framework import (    Agent,    AgentContext,    BaseChatClient,    Content,    agent_middleware,    tool,)from agent_framework_ag_ui import AgentFrameworkAgent, state_updatefrom pydantic import Fieldlogger = logging.getLogger(__name__)# ---------------------------------------------------------------------------# State schema — `delegations` is rendered as a live log in the UI.# ---------------------------------------------------------------------------STATE_SCHEMA: dict[str, object] = {    "delegations": {        "type": "array",        "description": (            "Append-only log of supervisor -> sub-agent delegations. "            "Each entry is a Delegation = "            "{id, sub_agent, task, status, result}."        ),        "items": {            "type": "object",            "properties": {                "id": {"type": "string"},                "sub_agent": {"type": "string"},                "task": {"type": "string"},                "status": {"type": "string"},                "result": {"type": "string"},            },        },    }}# ---------------------------------------------------------------------------# Per-request current_state bridge## Tools cannot directly receive `current_state` from the AG-UI runtime,# but `agent_middleware` runs once per agent invocation with full# session context. We snapshot the latest `delegations` list into a# ContextVar before `call_next()`, so each delegation tool (running in# the same task / contextvar scope) can read it back, append, and# return the FULL list via `state_update`.# ---------------------------------------------------------------------------_current_delegations: contextvars.ContextVar[list[dict[str, Any]]] = (    contextvars.ContextVar("ms_subagents_current_delegations", default=[]))def _extract_delegations(raw: Any) -> list[dict[str, Any]]:    """Pull a clean delegations list out of session metadata.    `session.metadata["current_state"]` is JSON-serialized by the    AG-UI runtime (see `_build_safe_metadata`) so we tolerate either    a plain dict or its string form.    """    payload: Any = raw    if isinstance(payload, str):        try:            payload = json.loads(payload)        except json.JSONDecodeError:            logger.warning(                "subagents: current_state was not valid JSON; "                "starting from empty delegations list"            )            return []    if not isinstance(payload, dict):        return []    delegations = payload.get("delegations")    if not isinstance(delegations, list):        return []    return [dict(d) for d in delegations if isinstance(d, dict)]@agent_middlewareasync def capture_current_state(    context: AgentContext, call_next: Callable[[], Awaitable[None]]) -> None:    """Snapshot `delegations` from session metadata into a ContextVar."""    snapshot: list[dict[str, Any]] = []    session = context.session    metadata = getattr(session, "metadata", None) if session else None    if isinstance(metadata, dict):        snapshot = _extract_delegations(metadata.get("current_state"))    token = _current_delegations.set(snapshot)    try:        await call_next()    finally:        _current_delegations.reset(token)# ---------------------------------------------------------------------------# Sub-agent factory## Each sub-agent is a full `Agent(...)` with its own system prompt.# They share the chat client with the supervisor but otherwise have no# shared memory or tools — the supervisor only sees their final text.# ---------------------------------------------------------------------------# Each sub-agent is a full-fledged `Agent(...)` with its own system# prompt. They don't share memory or tools with the supervisor — the# supervisor only sees their return value (final text content)._RESEARCH_INSTRUCTIONS = (    "You are a research sub-agent. Given a topic, produce a concise "    "bulleted list of 3-5 key facts. No preamble, no closing.")_WRITING_INSTRUCTIONS = (    "You are a writing sub-agent. Given a brief and optional source "    "facts, produce a polished 1-paragraph draft. Be clear and "    "concrete. No preamble.")_CRITIQUE_INSTRUCTIONS = (    "You are an editorial critique sub-agent. Given a draft, give "    "2-3 crisp, actionable critiques. No preamble.")def _make_sub_agent(chat_client: BaseChatClient, name: str, instructions: str) -> Agent:    return Agent(        client=chat_client,        name=name,        instructions=instructions,        tools=[],    )# Module-level holder so the delegation tools can reach the# pre-built sub-agents without rebuilding them on every tool call.# Populated lazily by `create_subagents_agent(...)`._SUB_AGENTS: dict[str, Agent] = {}async def _invoke_sub_agent_async(sub_agent_name: str, task: str) -> str:    """Run a sub-agent on `task` and return its final text content."""    agent = _SUB_AGENTS.get(sub_agent_name)    if agent is None:        raise RuntimeError(            f"sub-agent '{sub_agent_name}' is not registered; call "            "create_subagents_agent(...) first"        )    response = await agent.run(task)    text = (getattr(response, "text", "") or "").strip()    if text:        return text    # Fall back to scanning messages — `Agent.run` always returns    # an `AgentRunResponse`, but `.text` may be empty if the chat    # client only emitted reasoning content or tool calls.    messages = getattr(response, "messages", None) or []    for message in reversed(messages):        for content in getattr(message, "contents", None) or []:            content_text = getattr(content, "text", None)            if content_text:                fallback = str(content_text).strip()                if fallback:                    return fallback    raise RuntimeError(f"sub-agent '{sub_agent_name}' returned no text content")def _invoke_sub_agent(sub_agent_name: str, task: str) -> str:    """Sync bridge: drive the async invocation from inside a tool callback.    `@tool` reflects on the underlying callable's signature, so the    tool entry points are sync. The supervisor's chat client typically    runs inside an existing event loop (FastAPI request handler), so    `asyncio.run` would refuse — fall through to a worker thread that    spins up its own loop.    """    try:        return asyncio.run(_invoke_sub_agent_async(sub_agent_name, task))    except RuntimeError as exc:        if "asyncio.run() cannot be called" not in str(exc):            raise    container: dict[str, Any] = {}    def _runner() -> None:        try:            container["result"] = asyncio.run(                _invoke_sub_agent_async(sub_agent_name, task)            )        except Exception as inner:  # pragma: no cover -- defensive            container["error"] = inner    worker = threading.Thread(target=_runner, daemon=True)    worker.start()    worker.join()    if "error" in container:        raise container["error"]    return str(container["result"])def _delegate(sub_agent_name: str, task: str) -> Content:    """Common delegation flow: invoke sub-agent, append entry, push state."""    delegations = list(_current_delegations.get())    entry_id = str(uuid.uuid4())    try:        result_text = _invoke_sub_agent(sub_agent_name, task)    except Exception as exc:        logger.exception("subagents: %s delegation failed", sub_agent_name)        delegations.append(            {                "id": entry_id,                "sub_agent": sub_agent_name,                "task": task,                "status": "failed",                # Surface only the exception class — sub-agent error                # messages can leak chat client URLs / quota details                # in deployed environments.                "result": (f"sub-agent error: {exc.__class__.__name__}"),            }        )        # Mirror the contextvar so a follow-up sub-agent call within the        # same supervisor turn sees this entry.        _current_delegations.set(delegations)        return state_update(            text=(f"{sub_agent_name} failed; surfaced in delegation log."),            state={"delegations": delegations},        )    delegations.append(        {            "id": entry_id,            "sub_agent": sub_agent_name,            "task": task,            "status": "completed",            "result": result_text,        }    )    _current_delegations.set(delegations)    return state_update(        text=result_text,        state={"delegations": delegations},    )# ---------------------------------------------------------------------------# Supervisor delegation tools — each one wraps a sub-agent invocation.# ---------------------------------------------------------------------------# Each @tool wraps a sub-agent invocation. The supervisor LLM "calls"# these tools to delegate work; each call synchronously runs the# matching sub-agent (via `_delegate`), appends the entry to the# `delegations` shared-state slot, and returns a `state_update(...)` so# the AG-UI emitter pushes a deterministic StateSnapshotEvent — both# surfacing the result to the supervisor and refreshing the live# delegation log in the UI.@tool(    name="research_agent",    description=(        "Delegate a research task to the research sub-agent. Use for "        "gathering facts, background, definitions, statistics. Returns "        "a bulleted list of key facts."    ),)def research_agent(    task: Annotated[        str,        Field(description="The research question or topic to investigate."),    ],) -> Content:    """Delegate a research task to the research sub-agent."""    return _delegate("research_agent", task)@tool(    name="writing_agent",    description=(        "Delegate a drafting task to the writing sub-agent. Use for "        "producing a polished paragraph, draft, or summary. Pass any "        "relevant facts from prior research inside `task`."    ),)def writing_agent(    task: Annotated[        str,        Field(            description=(                "The drafting brief, including any relevant source "                "facts the writer should weave in."            )        ),    ],) -> Content:    """Delegate a drafting task to the writing sub-agent."""    return _delegate("writing_agent", task)@tool(    name="critique_agent",    description=(        "Delegate a critique task to the critique sub-agent. Use for "        "reviewing a draft and suggesting concrete improvements."    ),)def critique_agent(    task: Annotated[        str,        Field(            description=(                "The draft text to critique. Provide the full text -- "                "the critique sub-agent has no other context."            )        ),    ],) -> Content:    """Delegate a critique task to the critique sub-agent."""    return _delegate("critique_agent", task)

This is where CopilotKit's shared-state channel earns its keep: the supervisor's tool calls mutate delegations as they happen, and the frontend renders every new entry live.

Rendering a live delegation log#

On the frontend, the delegation log is just a reactive render of the delegations slot. Subscribe with useAgent({ updates: [UseAgentUpdate.OnStateChanged, UseAgentUpdate.OnRunStatusChanged] }), read agent.state.delegations, and render one card per entry.

delegation-log.tsx
/** * Live delegation log — renders the `delegations` slot of agent state. * * Each entry corresponds to one invocation of a sub-agent. The list * grows in real time as the supervisor fans work out to its children. * The parent header shows how many sub-agents have been called and * whether the supervisor is still running. */// Fixed list of the three sub-agent roles the supervisor can call.// Rendered as always-visible indicator chips at the top of the log// (regardless of whether the supervisor has delegated yet) so the user// — and the e2e suite — can see at a glance which sub-agents exist and// which are currently active.const INDICATOR_ROLES: ReadonlyArray<{  role: "researcher" | "writer" | "critic";  subAgent: SubAgentName;}> = [  { role: "researcher", subAgent: "research_agent" },  { role: "writer", subAgent: "writing_agent" },  { role: "critic", subAgent: "critique_agent" },];export function DelegationLog({ delegations, isRunning }: DelegationLogProps) {  const calledRoles = new Set<SubAgentName>(    delegations.map((d) => d.sub_agent),  );  return (    <div      data-testid="delegation-log"      className="w-full h-full flex flex-col bg-white rounded-2xl shadow-sm border border-[#DBDBE5] overflow-hidden"    >      <div className="flex items-center justify-between px-6 py-3 border-b border-[#E9E9EF] bg-[#FAFAFC]">        <div className="flex items-center gap-3">          <span className="text-lg font-semibold text-[#010507]">            Sub-agent delegations          </span>          {isRunning && (            <span              data-testid="supervisor-running"              className="inline-flex items-center gap-1.5 px-2 py-0.5 rounded-full border border-[#BEC2FF] bg-[#BEC2FF1A] text-[#010507] text-[10px] font-semibold uppercase tracking-[0.12em]"            >              <span className="w-1.5 h-1.5 rounded-full bg-[#010507] animate-pulse" />              Supervisor running            </span>          )}        </div>        <span          data-testid="delegation-count"          className="text-xs font-mono text-[#838389]"        >          {delegations.length} calls        </span>      </div>      <div        data-testid="subagent-indicators"        className="flex items-center gap-2 border-b border-[#E9E9EF] bg-white px-6 py-2"      >        {INDICATOR_ROLES.map(({ role, subAgent }) => {          const style = SUB_AGENT_STYLE[subAgent];          const fired = calledRoles.has(subAgent);          return (            <span              key={role}              data-testid={`subagent-indicator-${role}`}              data-role={role}              data-fired={fired ? "true" : "false"}              className={`inline-flex items-center gap-1 px-2 py-0.5 rounded-full text-[10px] font-semibold uppercase tracking-[0.1em] border ${style.color} ${                fired ? "" : "opacity-60"              }`}            >              <span aria-hidden>{style.emoji}</span>              <span>{style.label}</span>            </span>          );        })}      </div>      <div className="flex-1 overflow-y-auto p-4 space-y-3">        {delegations.length === 0 ? (          <p className="text-[#838389] italic text-sm">            Ask the supervisor to complete a task. Every sub-agent it calls will            appear here.          </p>        ) : (          delegations.map((d, idx) => {            const style = SUB_AGENT_STYLE[d.sub_agent];            return (              <div                key={d.id}                data-testid="delegation-entry"                className="border border-[#E9E9EF] rounded-xl p-3 bg-[#FAFAFC]"              >                <div className="flex items-center justify-between mb-2">                  <div className="flex items-center gap-2">                    <span className="text-xs font-mono text-[#AFAFB7]">                      #{idx + 1}                    </span>                    <span                      className={`inline-flex items-center gap-1 px-2 py-0.5 rounded-full text-[10px] font-semibold uppercase tracking-[0.1em] border ${style.color}`}                    >                      <span>{style.emoji}</span>                      <span>{style.label}</span>                    </span>                  </div>                  <span className="text-[10px] uppercase tracking-[0.12em] font-semibold text-[#189370]">                    {d.status}                  </span>                </div>                <div className="text-xs text-[#57575B] mb-2">                  <span className="font-semibold text-[#010507]">Task: </span>                  {d.task}                </div>                <div className="text-sm text-[#010507] whitespace-pre-wrap bg-white rounded-lg p-2.5 border border-[#E9E9EF]">                  {d.result}                </div>              </div>            );          })        )}      </div>    </div>  );}

The result: as the supervisor fans work out to its sub-agents, the log grows in real time, giving the user visibility into a process that would otherwise be a long opaque spinner.

  • Shared State — the channel that makes the delegation log live.
  • State streaming — stream individual sub-agent outputs token-by-token inside each log entry.