CopilotKit

Sub-Agents

Decompose work across multiple specialized agents with a visible delegation log.


"""LlamaIndex agent backing the Sub-Agents demo.Mirrors `langgraph-python/src/agents/subagents.py` and`google-adk/src/agents/subagents_agent.py`. A supervisor agent (thedefault `AGUIChatWorkflow` wired below) delegates to three specializedsub-agents — research / writing / critique — exposed as backend tools.Each sub-agent is a stand-alone single-shot LLM call (mirrors thegoogle-adk pattern: a `FunctionAgent`-equivalent invocation per delegationkeeps the supervisor's tool surface small). Every delegation appends a`Delegation` entry to `state["delegations"]`:    {id, sub_agent, task, status: "running"|"completed"|"failed", result}The router emits a `StateSnapshotWorkflowEvent` after every tool call, sothe frontend's `useAgent({ updates: [OnStateChanged] })` subscriptionreceives a live delegation log as the supervisor fans work out.Implementation notes:- We use a stand-alone `FunctionAgent` per sub-agent so each has its own  isolated `system_prompt` and message context. The supervisor only sees  the sub-agent's final text via the tool's return value.- `state["delegations"]` is mutated in place inside the supervisor's  tools; the router's state snapshot picks up the change automatically."""import loggingimport osimport uuidfrom typing import Annotated, Anyfrom llama_index.core.agent.workflow import FunctionAgentfrom llama_index.core.workflow import Contextfrom llama_index.llms.openai import OpenAIfrom llama_index.protocols.ag_ui.router import get_ag_ui_workflow_routerlogger = logging.getLogger(__name__)# ---------------------------------------------------------------------------# Sub-agents — one FunctionAgent per role, each with its own system prompt.# These are stand-alone agents the supervisor cannot share memory with;# the supervisor only sees the final text the sub-agent returns.# ---------------------------------------------------------------------------_openai_kwargs = {}if os.environ.get("OPENAI_BASE_URL"):    _openai_kwargs["api_base"] = os.environ["OPENAI_BASE_URL"]_SUB_LLM = OpenAI(model="gpt-4.1-mini", **_openai_kwargs)_RESEARCH_SYSTEM = (    "You are a research sub-agent. Given a topic, produce a concise "    "bulleted list of 3-5 key facts. No preamble, no closing.")_WRITING_SYSTEM = (    "You are a writing sub-agent. Given a brief and optional source "    "facts, produce a polished 1-paragraph draft. Be clear and concrete. "    "No preamble.")_CRITIQUE_SYSTEM = (    "You are an editorial critique sub-agent. Given a draft, give 2-3 "    "crisp, actionable critiques. No preamble.")def _build_sub_agent(system_prompt: str, name: str) -> FunctionAgent:    # `timeout=60` so a stalled sub-agent run can never wedge the    # supervisor's tool call indefinitely.    return FunctionAgent(        name=name,        description=system_prompt,        llm=_SUB_LLM,        tools=[],        system_prompt=system_prompt,        timeout=60,    )_research_sub = _build_sub_agent(_RESEARCH_SYSTEM, "research_sub")_writing_sub = _build_sub_agent(_WRITING_SYSTEM, "writing_sub")_critique_sub = _build_sub_agent(_CRITIQUE_SYSTEM, "critique_sub")class _SubAgentError(Exception):    """Raised when a sub-agent invocation fails.    Carries a user-facing message safe to surface in the delegation log.    """async def _invoke_sub_agent(agent: FunctionAgent, task: str) -> str:    """Run a sub-agent on `task` and return its final response text."""    try:        response = await agent.run(user_msg=task)    except Exception as exc:  # noqa: BLE001 - we re-raise with safe message        logger.exception("subagent: FunctionAgent.run failed")        raise _SubAgentError(            f"sub-agent call failed: {exc.__class__.__name__} "            "(see server logs for details)"        ) from exc    text = str(response).strip()    if not text:        raise _SubAgentError("sub-agent returned empty text")    return text# ---------------------------------------------------------------------------# Delegation log helpers (mutate state["delegations"] in place)# ---------------------------------------------------------------------------async def _append_running_delegation(ctx: Context, *, sub_agent: str, task: str) -> str:    """Append a `running` delegation entry; return its id."""    state: dict[str, Any] = await ctx.store.get("state", default={})    delegations = list(state.get("delegations") or [])    entry_id = str(uuid.uuid4())    delegations.append(        {            "id": entry_id,            "sub_agent": sub_agent,            "task": task,            "status": "running",            "result": "",        }    )    state["delegations"] = delegations    await ctx.store.set("state", state)    return entry_idasync def _finalize_delegation(    ctx: Context, *, entry_id: str, status: str, result: str) -> None:    """Replace the matching entry's status + result.    If the entry has gone missing (e.g. another writer replaced    `state['delegations']` mid-turn) we log loudly and skip — slipping in    a synthetic entry with `sub_agent='unknown'` would render as    undefined badge text in `delegation-log.tsx`.    """    state: dict[str, Any] = await ctx.store.get("state", default={})    delegations = list(state.get("delegations") or [])    for entry in delegations:        if entry.get("id") == entry_id:            entry["status"] = status            entry["result"] = result            state["delegations"] = delegations            await ctx.store.set("state", state)            return    logger.warning(        "subagent: delegation entry %s missing on update — final %s "        "state will not be rendered",        entry_id,        status,    )async def _delegate(    ctx: Context, *, sub_agent_name: str, agent: FunctionAgent, task: str) -> dict[str, Any]:    """Append a running entry, run the sub-agent, finalize the entry."""    entry_id = await _append_running_delegation(        ctx, sub_agent=sub_agent_name, task=task    )    try:        result = await _invoke_sub_agent(agent, task)    except _SubAgentError as exc:        await _finalize_delegation(            ctx, entry_id=entry_id, status="failed", result=str(exc)        )        return {"status": "failed", "error": str(exc)}    await _finalize_delegation(        ctx, entry_id=entry_id, status="completed", result=result    )    return {"status": "completed", "result": result}# ---------------------------------------------------------------------------# Supervisor tools — each delegates to one sub-agent.# ---------------------------------------------------------------------------async def research_agent(    ctx: Context,    task: Annotated[        str,        "Research brief — the topic / question to gather facts on.",    ],) -> str:    """Delegate a research task to the research sub-agent.    Use for: gathering facts, background, definitions, statistics.    Returns a JSON-ish string of {status, result|error}.    """    outcome = await _delegate(        ctx,        sub_agent_name="research_agent",        agent=_research_sub,        task=task,    )    return _stringify_outcome(outcome)async def writing_agent(    ctx: Context,    task: Annotated[        str,        "Writing brief — include relevant facts from prior research.",    ],) -> str:    """Delegate a drafting task to the writing sub-agent.    Use for: producing a polished paragraph, draft, or summary. Pass    relevant facts from prior research inside `task`.    """    outcome = await _delegate(        ctx,        sub_agent_name="writing_agent",        agent=_writing_sub,        task=task,    )    return _stringify_outcome(outcome)async def critique_agent(    ctx: Context,    task: Annotated[str, "The draft to critique."],) -> str:    """Delegate a critique task to the critique sub-agent.    Use for: reviewing a draft and suggesting concrete improvements.    """    outcome = await _delegate(        ctx,        sub_agent_name="critique_agent",        agent=_critique_sub,        task=task,    )    return _stringify_outcome(outcome)def _stringify_outcome(outcome: dict[str, Any]) -> str:    """Render the delegation outcome as plain text the supervisor LLM can read."""    if outcome.get("status") == "completed":        return str(outcome.get("result") or "")    return f"[sub-agent failed] {outcome.get('error') or 'unknown error'}"# ---------------------------------------------------------------------------# Supervisor (the workflow router we export).# ---------------------------------------------------------------------------SUPERVISOR_SYSTEM_PROMPT = (    "You are a supervisor agent that coordinates three specialized "    "sub-agents to produce high-quality deliverables.\n\n"    "Available sub-agents (call them as tools):\n"    "  - research_agent: gathers facts on a topic.\n"    "  - writing_agent: turns facts + a brief into a polished draft.\n"    "  - critique_agent: reviews a draft and suggests improvements.\n\n"    "For most non-trivial user requests, delegate in sequence: "    "research -> write -> critique. Pass the relevant facts/draft through "    "the `task` argument of each tool. Each tool returns either the "    "sub-agent's text output or a `[sub-agent failed]` prefix on failure. "    "If a sub-agent fails, briefly surface the failure to the user (do "    "not fabricate a result) and decide whether to retry. Keep your own "    "messages short — explain the plan once, delegate, then return a "    "concise summary once done. The UI renders a live log of every "    "sub-agent delegation.")subagents_router = get_ag_ui_workflow_router(    llm=OpenAI(model="gpt-4.1", **_openai_kwargs),    frontend_tools=[],    backend_tools=[research_agent, writing_agent, critique_agent],    system_prompt=SUPERVISOR_SYSTEM_PROMPT,    initial_state={"delegations": []},)

What is this?#

Sub-agents are the canonical multi-agent pattern: a top-level supervisor LLM orchestrates one or more specialized sub-agents by exposing each of them as a tool. The supervisor decides what to delegate, the sub-agents do their narrow job, and their results flow back up to the supervisor's next step.

This is fundamentally the same shape as tool-calling, but each "tool" is itself a full-blown agent with its own system prompt and (often) its own tools, memory, and model.

When should I use this?#

Reach for sub-agents when a task has distinct specialized sub-tasks that each benefit from their own focus:

  • Research → Write → Critique pipelines, where each stage needs a different system prompt and temperature.
  • Router + specialists, where one agent classifies the request and dispatches to the right expert.
  • Divide-and-conquer — any problem that fits cleanly into parallel or sequential sub-problems.

The example below uses the Research → Write → Critique shape as the canonical example.

Setting up sub-agents#

Each sub-agent is a full create_agent(...) call with its own model, its own system prompt, and (optionally) its own tools. They don't share memory or tools with the supervisor; the supervisor only ever sees what the sub-agent returns.

subagents_agent.py
import loggingimport osimport uuidfrom typing import Annotated, Anyfrom llama_index.core.agent.workflow import FunctionAgentfrom llama_index.core.workflow import Contextfrom llama_index.llms.openai import OpenAIfrom llama_index.protocols.ag_ui.router import get_ag_ui_workflow_routerlogger = logging.getLogger(__name__)# ---------------------------------------------------------------------------# Sub-agents — one FunctionAgent per role, each with its own system prompt.# These are stand-alone agents the supervisor cannot share memory with;# the supervisor only sees the final text the sub-agent returns.# ---------------------------------------------------------------------------_openai_kwargs = {}if os.environ.get("OPENAI_BASE_URL"):    _openai_kwargs["api_base"] = os.environ["OPENAI_BASE_URL"]_SUB_LLM = OpenAI(model="gpt-4.1-mini", **_openai_kwargs)_RESEARCH_SYSTEM = (    "You are a research sub-agent. Given a topic, produce a concise "    "bulleted list of 3-5 key facts. No preamble, no closing.")_WRITING_SYSTEM = (    "You are a writing sub-agent. Given a brief and optional source "    "facts, produce a polished 1-paragraph draft. Be clear and concrete. "    "No preamble.")_CRITIQUE_SYSTEM = (    "You are an editorial critique sub-agent. Given a draft, give 2-3 "    "crisp, actionable critiques. No preamble.")def _build_sub_agent(system_prompt: str, name: str) -> FunctionAgent:    # `timeout=60` so a stalled sub-agent run can never wedge the    # supervisor's tool call indefinitely.    return FunctionAgent(        name=name,        description=system_prompt,        llm=_SUB_LLM,        tools=[],        system_prompt=system_prompt,        timeout=60,    )_research_sub = _build_sub_agent(_RESEARCH_SYSTEM, "research_sub")_writing_sub = _build_sub_agent(_WRITING_SYSTEM, "writing_sub")_critique_sub = _build_sub_agent(_CRITIQUE_SYSTEM, "critique_sub")

Keep sub-agent system prompts narrow and focused. The point of this pattern is that each one does one thing well. If a sub-agent needs to know the whole user context to do its job, that's a signal the boundary is wrong.

Exposing sub-agents as tools#

The supervisor delegates by calling tools. Each tool is a thin wrapper around sub_agent.invoke(...) that:

  1. Runs the sub-agent synchronously on the supplied task string.
  2. Records the delegation into a delegations slot in shared agent state (so the UI can render a live log).
  3. Returns the sub-agent's final message as a ToolMessage, which the supervisor sees as a normal tool result on its next turn.
subagents_agent.py
import loggingimport osimport uuidfrom typing import Annotated, Anyfrom llama_index.core.agent.workflow import FunctionAgentfrom llama_index.core.workflow import Contextfrom llama_index.llms.openai import OpenAIfrom llama_index.protocols.ag_ui.router import get_ag_ui_workflow_routerlogger = logging.getLogger(__name__)# ---------------------------------------------------------------------------# Sub-agents — one FunctionAgent per role, each with its own system prompt.# These are stand-alone agents the supervisor cannot share memory with;# the supervisor only sees the final text the sub-agent returns.# ---------------------------------------------------------------------------_openai_kwargs = {}if os.environ.get("OPENAI_BASE_URL"):    _openai_kwargs["api_base"] = os.environ["OPENAI_BASE_URL"]_SUB_LLM = OpenAI(model="gpt-4.1-mini", **_openai_kwargs)_RESEARCH_SYSTEM = (    "You are a research sub-agent. Given a topic, produce a concise "    "bulleted list of 3-5 key facts. No preamble, no closing.")_WRITING_SYSTEM = (    "You are a writing sub-agent. Given a brief and optional source "    "facts, produce a polished 1-paragraph draft. Be clear and concrete. "    "No preamble.")_CRITIQUE_SYSTEM = (    "You are an editorial critique sub-agent. Given a draft, give 2-3 "    "crisp, actionable critiques. No preamble.")def _build_sub_agent(system_prompt: str, name: str) -> FunctionAgent:    # `timeout=60` so a stalled sub-agent run can never wedge the    # supervisor's tool call indefinitely.    return FunctionAgent(        name=name,        description=system_prompt,        llm=_SUB_LLM,        tools=[],        system_prompt=system_prompt,        timeout=60,    )_research_sub = _build_sub_agent(_RESEARCH_SYSTEM, "research_sub")_writing_sub = _build_sub_agent(_WRITING_SYSTEM, "writing_sub")_critique_sub = _build_sub_agent(_CRITIQUE_SYSTEM, "critique_sub")class _SubAgentError(Exception):    """Raised when a sub-agent invocation fails.    Carries a user-facing message safe to surface in the delegation log.    """async def _invoke_sub_agent(agent: FunctionAgent, task: str) -> str:    """Run a sub-agent on `task` and return its final response text."""    try:        response = await agent.run(user_msg=task)    except Exception as exc:  # noqa: BLE001 - we re-raise with safe message        logger.exception("subagent: FunctionAgent.run failed")        raise _SubAgentError(            f"sub-agent call failed: {exc.__class__.__name__} "            "(see server logs for details)"        ) from exc    text = str(response).strip()    if not text:        raise _SubAgentError("sub-agent returned empty text")    return text# ---------------------------------------------------------------------------# Delegation log helpers (mutate state["delegations"] in place)# ---------------------------------------------------------------------------async def _append_running_delegation(ctx: Context, *, sub_agent: str, task: str) -> str:    """Append a `running` delegation entry; return its id."""    state: dict[str, Any] = await ctx.store.get("state", default={})    delegations = list(state.get("delegations") or [])    entry_id = str(uuid.uuid4())    delegations.append(        {            "id": entry_id,            "sub_agent": sub_agent,            "task": task,            "status": "running",            "result": "",        }    )    state["delegations"] = delegations    await ctx.store.set("state", state)    return entry_idasync def _finalize_delegation(    ctx: Context, *, entry_id: str, status: str, result: str) -> None:    """Replace the matching entry's status + result.    If the entry has gone missing (e.g. another writer replaced    `state['delegations']` mid-turn) we log loudly and skip — slipping in    a synthetic entry with `sub_agent='unknown'` would render as    undefined badge text in `delegation-log.tsx`.    """    state: dict[str, Any] = await ctx.store.get("state", default={})    delegations = list(state.get("delegations") or [])    for entry in delegations:        if entry.get("id") == entry_id:            entry["status"] = status            entry["result"] = result            state["delegations"] = delegations            await ctx.store.set("state", state)            return    logger.warning(        "subagent: delegation entry %s missing on update — final %s "        "state will not be rendered",        entry_id,        status,    )async def _delegate(    ctx: Context, *, sub_agent_name: str, agent: FunctionAgent, task: str) -> dict[str, Any]:    """Append a running entry, run the sub-agent, finalize the entry."""    entry_id = await _append_running_delegation(        ctx, sub_agent=sub_agent_name, task=task    )    try:        result = await _invoke_sub_agent(agent, task)    except _SubAgentError as exc:        await _finalize_delegation(            ctx, entry_id=entry_id, status="failed", result=str(exc)        )        return {"status": "failed", "error": str(exc)}    await _finalize_delegation(        ctx, entry_id=entry_id, status="completed", result=result    )    return {"status": "completed", "result": result}# ---------------------------------------------------------------------------# Supervisor tools — each delegates to one sub-agent.# ---------------------------------------------------------------------------async def research_agent(    ctx: Context,    task: Annotated[        str,        "Research brief — the topic / question to gather facts on.",    ],) -> str:    """Delegate a research task to the research sub-agent.    Use for: gathering facts, background, definitions, statistics.    Returns a JSON-ish string of {status, result|error}.    """    outcome = await _delegate(        ctx,        sub_agent_name="research_agent",        agent=_research_sub,        task=task,    )    return _stringify_outcome(outcome)async def writing_agent(    ctx: Context,    task: Annotated[        str,        "Writing brief — include relevant facts from prior research.",    ],) -> str:    """Delegate a drafting task to the writing sub-agent.    Use for: producing a polished paragraph, draft, or summary. Pass    relevant facts from prior research inside `task`.    """    outcome = await _delegate(        ctx,        sub_agent_name="writing_agent",        agent=_writing_sub,        task=task,    )    return _stringify_outcome(outcome)async def critique_agent(    ctx: Context,    task: Annotated[str, "The draft to critique."],) -> str:    """Delegate a critique task to the critique sub-agent.    Use for: reviewing a draft and suggesting concrete improvements.    """    outcome = await _delegate(        ctx,        sub_agent_name="critique_agent",        agent=_critique_sub,        task=task,    )    return _stringify_outcome(outcome)

This is where CopilotKit's shared-state channel earns its keep: the supervisor's tool calls mutate delegations as they happen, and the frontend renders every new entry live.

Rendering a live delegation log#

On the frontend, the delegation log is just a reactive render of the delegations slot. Subscribe with useAgent({ updates: [UseAgentUpdate.OnStateChanged, UseAgentUpdate.OnRunStatusChanged] }), read agent.state.delegations, and render one card per entry.

delegation-log.tsx
/** * Live delegation log — renders the `delegations` slot of agent state. * * Each entry corresponds to one invocation of a sub-agent. The list grows * in real time as the supervisor fans work out to its children. The * header shows how many sub-agents have been called and whether the * supervisor is still running. */export function DelegationLog({ delegations, isRunning }: DelegationLogProps) {  return (    <div      data-testid="delegation-log"      className="w-full h-full flex flex-col bg-white rounded-2xl shadow-sm border border-[#DBDBE5] overflow-hidden"    >      <div className="flex items-center justify-between px-6 py-3 border-b border-[#E9E9EF] bg-[#FAFAFC]">        <div className="flex items-center gap-3">          <span className="text-lg font-semibold text-[#010507]">            Sub-agent delegations          </span>          {isRunning && (            <span              data-testid="supervisor-running"              className="inline-flex items-center gap-1.5 px-2 py-0.5 rounded-full border border-[#BEC2FF] bg-[#BEC2FF1A] text-[#010507] text-[10px] font-semibold uppercase tracking-[0.12em]"            >              <span className="w-1.5 h-1.5 rounded-full bg-[#010507] animate-pulse" />              Supervisor running            </span>          )}        </div>        <span          data-testid="delegation-count"          className="text-xs font-mono text-[#838389]"        >          {delegations.length} calls        </span>      </div>      <div className="flex-1 overflow-y-auto p-4 space-y-3">        {delegations.length === 0 ? (          <p className="text-[#838389] italic text-sm">            Ask the supervisor to complete a task. Every sub-agent it calls will            appear here.          </p>        ) : (          delegations.map((d, idx) => (            <article              key={d.id}              data-testid="delegation-entry"              data-status={d.status}              className={`rounded-xl border p-3 ${ENTRY_BG_BY_STATUS[d.status]}`}            >              <div className="flex items-center justify-between mb-2">                <div className="flex items-center gap-2">                  <span className="text-xs font-mono text-[#AFAFB7]">                    #{idx + 1}                  </span>                  <span                    className={`text-[11px] font-medium px-2 py-0.5 rounded-full border ${SUB_AGENT_BADGE_CLASS[d.sub_agent]}`}                  >                    {SUB_AGENT_LABEL[d.sub_agent]}                  </span>                </div>                <span                  className={`text-[10px] font-medium px-2 py-0.5 rounded-full border uppercase tracking-[0.1em] ${STATUS_BADGE_CLASS[d.status]}`}                >                  {STATUS_LABEL[d.status]}                </span>              </div>              <div className="text-xs text-[#57575B] mb-2">                <span className="font-semibold text-[#010507]">Task: </span>                {d.task}              </div>              {d.status === "running" ? (                <div className="flex items-center gap-2 text-xs text-amber-700">                  <span                    className="inline-block w-3 h-3 rounded-full border-2 border-amber-500 border-t-transparent animate-spin"                    aria-hidden                  />                  Sub-agent is working…                </div>              ) : (                <div                  className={`text-sm whitespace-pre-wrap rounded-lg p-2.5 border ${                    d.status === "failed"                      ? "text-red-700 bg-white border-red-100"                      : "text-[#010507] bg-white border-[#E9E9EF]"                  }`}                >                  {d.result}                </div>              )}            </article>          ))        )}      </div>    </div>  );}

The result: as the supervisor fans work out to its sub-agents, the log grows in real time, giving the user visibility into a process that would otherwise be a long opaque spinner.

  • Shared State — the channel that makes the delegation log live.
  • State streaming — stream individual sub-agent outputs token-by-token inside each log entry.