CopilotKit

Predictive state updates

Stream in-progress agent state updates to the frontend.


This example demonstrates predictive state updates in the CopilotKit Feature Viewer.

What is this?#

Microsoft Agent Framework agents can stream state updates through AG-UI as tool arguments are generated by the LLM. CopilotKit surfaces these updates in the UI, enabling optimistic, real-time rendering. We call these predictive state updates.

When should I use this?#

Use predictive state updates when you want to:

  • Keep users engaged during long-running operations
  • Show step-by-step progress
  • Build trust by exposing what the agent is doing now, not only at the end
  • Enable agent steering (users can intervene if needed)

Source of truth

When the tool completes, the agent emits a final state snapshot. Any predictive updates should be reflected in that final state or they will be overwritten.

Implementation#

Define the state#

We will define an observed_steps array that is updated while the agent performs long-running tasks.

agent/Program.cs (excerpt)
using System.Text.Json.Serialization;
public class AgentStateSnapshot
{
    [JsonPropertyName("observed_steps")]
    public List<string> ObservedSteps { get; set; } = new();
}
agent/src/agent.py (excerpt)
STATE_SCHEMA: dict[str, object] = {
    "observed_steps": {
        "type": "array",
        "items": {"type": "string"},
        "description": "Array of completed steps"
    }
}

Emit the intermediate state (tool-based predictive updates)#

Configure AG-UI state management to treat tool arguments as predictive updates to observed_steps. As the LLM streams arguments for the tool call, AG-UI emits state delta events immediately.

agent/Program.cs (excerpt)
using System.ComponentModel;
using System.Text.Json;
using System.Text.Json.Serialization;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Hosting.AGUI.AspNetCore;
using Microsoft.AspNetCore.Http.Json;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddAGUI();
// Register a source-generated serializer context for fast, typed JSON
builder.Services.ConfigureHttpJsonOptions(options =>
    options.SerializerOptions.TypeInfoResolverChain.Add(AGUIDojoServerSerializerContext.Default));

var app = builder.Build();

string endpoint = builder.Configuration["AZURE_OPENAI_ENDPOINT"]!;
string deployment = builder.Configuration["AZURE_OPENAI_DEPLOYMENT_NAME"]!;

// Define a tool the LLM may call as it progresses to report partial steps
[Description("Report current step progress.")]
static string StepProgress([Description("Steps completed so far")] string[] steps)
    => "Progress received.";

// Create the base agent with the reporting tool
var baseAgent = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetChatClient(deployment)
    .CreateAIAgent(
        name: "AGUIAssistant",
        instructions: "You are a helpful assistant that may call the 'step_progress' tool to report intermediate steps.",
        tools: [AIFunctionFactory.Create(StepProgress)]);

// Wrap with a streaming middleware that emits interim state snapshots (typed, source-generated).
// See the "Stream state from your agent" section in the Agent State guide for a full example of a DelegatingAIAgent
// that reads streaming updates and emits DataContent with an AgentStateSnapshot.
var jsonOptions = app.Services.GetRequiredService<IOptions<JsonOptions>>();
AIAgent agent = new StateStreamingAgent(baseAgent, jsonOptions.Value.SerializerOptions);

app.MapAGUI("/", agent);
await app.RunAsync();

// Example: streaming agent wrapper emitting state snapshots (simplified)
internal sealed class StateStreamingAgent : DelegatingAIAgent
{
    private readonly JsonSerializerOptions _jsonOptions;
    public StateStreamingAgent(AIAgent inner, JsonSerializerOptions jsonOptions) : base(inner)
    {
        _jsonOptions = jsonOptions;
    }

    public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
        IEnumerable<ChatMessage> messages,
        AgentThread? thread = null,
        AgentRunOptions? options = null,
        [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var observedSteps = new List<string>();
        await foreach (var update in this.InnerAgent.RunStreamingAsync(messages, thread, options, cancellationToken))
        {
            // Inspect streaming contents for function calls and collect step arguments as they arrive
            foreach (var content in update.Contents)
            {
                if (content is FunctionCallContent f
                    && string.Equals(f.Name, "step_progress", StringComparison.OrdinalIgnoreCase)
                    && f.Arguments is JsonElement args)
                {
                    if (args.TryGetProperty("steps", out var stepsElement))
                    {
                        if (stepsElement.Deserialize(_jsonOptions.GetTypeInfo(typeof(string[]))) is string[] steps)
                        {
                            observedSteps.Clear();
                            foreach (var s in steps)
                            {
                                observedSteps.Add(s);
                            }
                            // Emit a typed state snapshot into the AG‑UI stream
                            var snapshot = new AgentStateSnapshot { Steps = observedSteps };
                            byte[] stateBytes = JsonSerializer.SerializeToUtf8Bytes(
                                snapshot,
                                _jsonOptions.GetTypeInfo(typeof(AgentStateSnapshot)));
                            yield return new AgentRunResponseUpdate
                            {
                                Contents = [ new DataContent(stateBytes, "application/json") ]
                            };
                        }
                    }
                }
            }

            // Always forward the original update (text deltas / final tool results, etc.)
            yield return update;
        }
    }
}

// Typed state snapshot for source-generated JSON
internal sealed class AgentStateSnapshot
{
    [JsonPropertyName("observed_steps")]
    public List<string> Steps { get; set; } = new();
}

// Source-generated serializer context (register above via ConfigureHttpJsonOptions)
[JsonSerializable(typeof(AgentStateSnapshot))]
[JsonSerializable(typeof(string[]))]
internal sealed partial class AGUIDojoServerSerializerContext : JsonSerializerContext;
agent/src/agent.py (excerpt)
from __future__ import annotations
from typing import Annotated
from agent_framework import Agent, SupportsChatGetResponse, tool
from agent_framework_ag_ui import AgentFrameworkAgent
from pydantic import Field

# 1) Define state schema for AG-UI
STATE_SCHEMA: dict[str, object] = {
    "observed_steps": {
        "type": "array",
        "items": {"type": "string"},
        "description": "Array of completed steps"
    }
}

# 2) Predictive state mapping: observed_steps <- step_progress.steps
PREDICT_STATE_CONFIG: dict[str, dict[str, str]] = {
    "observed_steps": {
        "tool": "step_progress",
        "tool_argument": "steps",
    }
}

# 3) Tool that the LLM will call with step updates
@tool
def step_progress(
    steps: Annotated[list[str], Field(description="Steps completed so far")]
) -> str:
    return "Progress received."

def create_agent(chat_client: SupportsChatGetResponse) -> AgentFrameworkAgent:
    base = Agent(
        name="sample_agent",
        instructions="You are a task performer. Report progress using step_progress.",
        client=chat_client,
        tools=[step_progress],
    )
    return AgentFrameworkAgent(
        agent=base,
        name="CopilotKitMicrosoftAgentFrameworkAgent",
        description="Agent with predictive state updates for observed steps.",
        state_schema=STATE_SCHEMA,
        predict_state_config=PREDICT_STATE_CONFIG,
        require_confirmation=False,
    )

With this configuration, AG-UI emits predictive state updates as soon as the model streams the tool arguments, without waiting for tool completion.

Observe predictions on the client#

Add a state renderer to observe the predicted observed_steps updates as they stream in.

ui/app/page.tsx

type AgentState = {
  observed_steps: string[];
};

export default function Page() {
  // Access both predicted and final states
  const { agent } = useAgent({ agentId: "sample_agent" });

  // Observe predictions (render inside the chat)
  useAgent({
    agentId: "sample_agent",
    render: ({ state }) => {
      if (!state.observed_steps?.length) return null;
      return (
        <div>
          <h3>Current Progress:</h3>
          <ul>
            {state.observed_steps.map((step, i) => (
              <li key={i}>{step}</li>
            ))}
          </ul>
        </div>
      );
    },
  });

  return <div>...</div>;
}

Give it a try!#

Ask the agent to perform a multi-step task (e.g., “write a short outline and report progress each step”). You’ll see observed_steps update in real time as the tool arguments stream in.