Predictive state updates
Stream in-progress agent state updates to the frontend.
This example demonstrates predictive state updates in the CopilotKit Feature Viewer.
What is this?#
Microsoft Agent Framework agents can stream state updates through AG-UI as tool arguments are generated by the LLM. CopilotKit surfaces these updates in the UI, enabling optimistic, real-time rendering. We call these predictive state updates.
When should I use this?#
Use predictive state updates when you want to:
- Keep users engaged during long-running operations
- Show step-by-step progress
- Build trust by exposing what the agent is doing now, not only at the end
- Enable agent steering (users can intervene if needed)
Source of truth
When the tool completes, the agent emits a final state snapshot. Any predictive updates should be reflected in that final state or they will be overwritten.
Implementation#
Define the state#
We will define an observed_steps array that is updated while the agent performs long-running tasks.
using System.Text.Json.Serialization;
public class AgentStateSnapshot
{
[JsonPropertyName("observed_steps")]
public List<string> ObservedSteps { get; set; } = new();
}STATE_SCHEMA: dict[str, object] = {
"observed_steps": {
"type": "array",
"items": {"type": "string"},
"description": "Array of completed steps"
}
}Emit the intermediate state (tool-based predictive updates)#
Configure AG-UI state management to treat tool arguments as predictive updates to observed_steps. As the LLM streams arguments for the tool call, AG-UI emits state delta events immediately.
using System.ComponentModel;
using System.Text.Json;
using System.Text.Json.Serialization;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Hosting.AGUI.AspNetCore;
using Microsoft.AspNetCore.Http.Json;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddAGUI();
// Register a source-generated serializer context for fast, typed JSON
builder.Services.ConfigureHttpJsonOptions(options =>
options.SerializerOptions.TypeInfoResolverChain.Add(AGUIDojoServerSerializerContext.Default));
var app = builder.Build();
string endpoint = builder.Configuration["AZURE_OPENAI_ENDPOINT"]!;
string deployment = builder.Configuration["AZURE_OPENAI_DEPLOYMENT_NAME"]!;
// Define a tool the LLM may call as it progresses to report partial steps
[Description("Report current step progress.")]
static string StepProgress([Description("Steps completed so far")] string[] steps)
=> "Progress received.";
// Create the base agent with the reporting tool
var baseAgent = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
.GetChatClient(deployment)
.CreateAIAgent(
name: "AGUIAssistant",
instructions: "You are a helpful assistant that may call the 'step_progress' tool to report intermediate steps.",
tools: [AIFunctionFactory.Create(StepProgress)]);
// Wrap with a streaming middleware that emits interim state snapshots (typed, source-generated).
// See the "Stream state from your agent" section in the Agent State guide for a full example of a DelegatingAIAgent
// that reads streaming updates and emits DataContent with an AgentStateSnapshot.
var jsonOptions = app.Services.GetRequiredService<IOptions<JsonOptions>>();
AIAgent agent = new StateStreamingAgent(baseAgent, jsonOptions.Value.SerializerOptions);
app.MapAGUI("/", agent);
await app.RunAsync();
// Example: streaming agent wrapper emitting state snapshots (simplified)
internal sealed class StateStreamingAgent : DelegatingAIAgent
{
private readonly JsonSerializerOptions _jsonOptions;
public StateStreamingAgent(AIAgent inner, JsonSerializerOptions jsonOptions) : base(inner)
{
_jsonOptions = jsonOptions;
}
public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
IEnumerable<ChatMessage> messages,
AgentThread? thread = null,
AgentRunOptions? options = null,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var observedSteps = new List<string>();
await foreach (var update in this.InnerAgent.RunStreamingAsync(messages, thread, options, cancellationToken))
{
// Inspect streaming contents for function calls and collect step arguments as they arrive
foreach (var content in update.Contents)
{
if (content is FunctionCallContent f
&& string.Equals(f.Name, "step_progress", StringComparison.OrdinalIgnoreCase)
&& f.Arguments is JsonElement args)
{
if (args.TryGetProperty("steps", out var stepsElement))
{
if (stepsElement.Deserialize(_jsonOptions.GetTypeInfo(typeof(string[]))) is string[] steps)
{
observedSteps.Clear();
foreach (var s in steps)
{
observedSteps.Add(s);
}
// Emit a typed state snapshot into the AG‑UI stream
var snapshot = new AgentStateSnapshot { Steps = observedSteps };
byte[] stateBytes = JsonSerializer.SerializeToUtf8Bytes(
snapshot,
_jsonOptions.GetTypeInfo(typeof(AgentStateSnapshot)));
yield return new AgentRunResponseUpdate
{
Contents = [ new DataContent(stateBytes, "application/json") ]
};
}
}
}
}
// Always forward the original update (text deltas / final tool results, etc.)
yield return update;
}
}
}
// Typed state snapshot for source-generated JSON
internal sealed class AgentStateSnapshot
{
[JsonPropertyName("observed_steps")]
public List<string> Steps { get; set; } = new();
}
// Source-generated serializer context (register above via ConfigureHttpJsonOptions)
[JsonSerializable(typeof(AgentStateSnapshot))]
[JsonSerializable(typeof(string[]))]
internal sealed partial class AGUIDojoServerSerializerContext : JsonSerializerContext;from __future__ import annotations
from typing import Annotated
from agent_framework import Agent, SupportsChatGetResponse, tool
from agent_framework_ag_ui import AgentFrameworkAgent
from pydantic import Field
# 1) Define state schema for AG-UI
STATE_SCHEMA: dict[str, object] = {
"observed_steps": {
"type": "array",
"items": {"type": "string"},
"description": "Array of completed steps"
}
}
# 2) Predictive state mapping: observed_steps <- step_progress.steps
PREDICT_STATE_CONFIG: dict[str, dict[str, str]] = {
"observed_steps": {
"tool": "step_progress",
"tool_argument": "steps",
}
}
# 3) Tool that the LLM will call with step updates
@tool
def step_progress(
steps: Annotated[list[str], Field(description="Steps completed so far")]
) -> str:
return "Progress received."
def create_agent(chat_client: SupportsChatGetResponse) -> AgentFrameworkAgent:
base = Agent(
name="sample_agent",
instructions="You are a task performer. Report progress using step_progress.",
client=chat_client,
tools=[step_progress],
)
return AgentFrameworkAgent(
agent=base,
name="CopilotKitMicrosoftAgentFrameworkAgent",
description="Agent with predictive state updates for observed steps.",
state_schema=STATE_SCHEMA,
predict_state_config=PREDICT_STATE_CONFIG,
require_confirmation=False,
)With this configuration, AG-UI emits predictive state updates as soon as the model streams the tool arguments, without waiting for tool completion.
Observe predictions on the client#
Add a state renderer to observe the predicted observed_steps updates as they stream in.
type AgentState = {
observed_steps: string[];
};
export default function Page() {
// Access both predicted and final states
const { agent } = useAgent({ agentId: "sample_agent" });
// Observe predictions (render inside the chat)
useAgent({
agentId: "sample_agent",
render: ({ state }) => {
if (!state.observed_steps?.length) return null;
return (
<div>
<h3>Current Progress:</h3>
<ul>
{state.observed_steps.map((step, i) => (
<li key={i}>{step}</li>
))}
</ul>
</div>
);
},
});
return <div>...</div>;
}Give it a try!#
Ask the agent to perform a multi-step task (e.g., “write a short outline and report progress each step”). You’ll see observed_steps update in real time as the tool arguments stream in.
