CopilotKit

State Streaming

Stream partial agent state updates to the UI while a tool call is still running.


"""Agent backing the State Streaming demo.

The agent writes a long `document` string into shared agent state via a
`write_document` tool. The UI renders `state["document"]` live as the
tool arguments arrive.

How the per-token "live" feel is produced:

1. `PredictStateMapping(state_key="document", tool="write_document",
   tool_argument="content", stream_tool_call=True)` is declared on the
   ADKAgent middleware in `registry.py`. The middleware emits a
   STATE_DELTA every time the corresponding tool argument grows.
2. `streaming_function_call_arguments=True` is also set on the ADKAgent
   middleware so ag_ui_adk subscribes to incremental TOOL_CALL_ARGS
   events from the underlying ADK runner. This requires google-adk
   >= 1.24.0 via Vertex AI for true per-token streaming; on older
   versions or via Gemini Studio the middleware emits a UserWarning at
   startup and falls back to chunk-level streaming, which still drives
   STATE_DELTAs but at coarser granularity. The UI's "LIVE" badge stays
   honest in both modes — it just updates fewer times per second on the
   fallback path.

The model itself does not need a `GenerateContentConfig` override for
this — the streaming behaviour is entirely controlled by the ADKAgent
middleware. This matches langgraph-python's StateStreamingMiddleware
setup.
"""

from __future__ import annotations

from ag_ui_adk import AGUIToolset
from ag_ui_adk.config import PredictStateMapping
from google.adk.agents import LlmAgent
from google.adk.tools import ToolContext

from agents.shared_chat import get_model, stop_on_terminal_text


def write_document(tool_context: ToolContext, document: str) -> dict:
    """Write a document into shared state.

    Whenever the user asks you to write or draft anything (essay, poem,
    email, summary, etc.), call this tool with the full content as a
    single string. The UI renders state["document"] live as you type.

    Argument name `document` mirrors langgraph-python's `write_document`
    signature so the shared D5 fixture (`tool_argument="document"`) and
    the LGP-aligned PredictStateMapping below stay in lock-step.
    """
    tool_context.state["document"] = document
    return {"status": "ok", "length": len(document)}


_INSTRUCTION = (
    "You are a collaborative writing assistant. Whenever the user asks "
    "you to write, draft, or revise any piece of text, ALWAYS call the "
    "`write_document` tool with the full content as a single string. "
    "Never paste the document into a chat message directly — the document "
    "belongs in shared state and the UI renders it live as you type."
)

shared_state_streaming_agent = LlmAgent(
    name="SharedStateStreamingAgent",
    model=get_model(),
    instruction=_INSTRUCTION,
    tools=[write_document, AGUIToolset()],
    after_model_callback=stop_on_terminal_text,
)


SHARED_STATE_STREAMING_PREDICT_STATE = [
    PredictStateMapping(
        state_key="document",
        tool="write_document",
        tool_argument="document",
        emit_confirm_tool=False,
        stream_tool_call=True,
    ),
]

What is this?#

By default, agent state only updates between LangGraph node transitions, so a long-running tool call (writing a full document, drafting an email) appears to the UI as one big burst at the end. For agent-native apps, that feels broken: users expect to watch the output materialise.

State streaming forwards the value of a specific tool argument straight into an agent state key as the argument is being generated. The UI, subscribed via useAgent, re-renders every token.

When should I use this?#

Use state streaming whenever a tool's output is long-form text or a growing structured value and you want the user to see it assemble in real time. Common shapes:

  • A collaborative writing agent that emits a document
  • A research agent that accumulates a list of findings
  • A planning agent that builds up a step-by-step plan

Without streaming, the user stares at a spinner. With streaming, they see the answer grow token-by-token.

The backend: one middleware, one StateItem#

The canonical pattern for prebuilt agents is StateStreamingMiddleware. It takes one or more StateItem(...) entries, each mapping a tool argument to a state key. When the LLM streams that argument, CopilotKit writes every partial value into shared state before the tool even finishes executing.

shared_state_streaming_agent.py
from __future__ import annotations

from ag_ui_adk import AGUIToolset
from ag_ui_adk.config import PredictStateMapping
from google.adk.agents import LlmAgent
from google.adk.tools import ToolContext

from agents.shared_chat import get_model, stop_on_terminal_text


def write_document(tool_context: ToolContext, document: str) -> dict:
    """Write a document into shared state.

    Whenever the user asks you to write or draft anything (essay, poem,
    email, summary, etc.), call this tool with the full content as a
    single string. The UI renders state["document"] live as you type.

    Argument name `document` mirrors langgraph-python's `write_document`
    signature so the shared D5 fixture (`tool_argument="document"`) and
    the LGP-aligned PredictStateMapping below stay in lock-step.
    """
    tool_context.state["document"] = document
    return {"status": "ok", "length": len(document)}


_INSTRUCTION = (
    "You are a collaborative writing assistant. Whenever the user asks "
    "you to write, draft, or revise any piece of text, ALWAYS call the "
    "`write_document` tool with the full content as a single string. "
    "Never paste the document into a chat message directly — the document "
    "belongs in shared state and the UI renders it live as you type."
)

shared_state_streaming_agent = LlmAgent(
    name="SharedStateStreamingAgent",
    model=get_model(),
    instruction=_INSTRUCTION,
    tools=[write_document, AGUIToolset()],
    after_model_callback=stop_on_terminal_text,
)


SHARED_STATE_STREAMING_PREDICT_STATE = [
    PredictStateMapping(
        state_key="document",
        tool="write_document",
        tool_argument="document",
        emit_confirm_tool=False,
        stream_tool_call=True,
    ),
]

A few things to note:

  • The state_key must exist on your AgentState schema (document: str in this demo).
  • The tool and tool_argument name the exact LLM-facing tool and argument to forward.
  • When the tool call completes, its final return value is written to the same key, so the streamed partial eventually becomes the authoritative final value.

The frontend: useAgent + OnStateChanged#

The UI side is identical to any other shared-state subscription: useAgent with OnStateChanged gives you a reactive agent.state. Add OnRunStatusChanged if you want a "LIVE" / "done" indicator.

page.tsx
  // Subscribe to BOTH state changes and run-status changes. The former  // drives the per-token document rerender; the latter toggles the  // "LIVE" badge when the agent starts / stops.  const { agent } = useAgent({    agentId: "shared-state-streaming",    updates: [UseAgentUpdate.OnStateChanged, UseAgentUpdate.OnRunStatusChanged],  });

From there, agent.state.document is just a string that grows on every token, and agent.isRunning tells you whether to show a streaming indicator.