Events
The AG-UI protocol uses a comprehensive event system for agent-UI communication. All events extend from BaseEvent and are strongly typed for compile-time safety.
BaseEvent
The base class for all protocol events:
sealed class BaseEvent {
final String type;
final DateTime timestamp;
final Map<String, dynamic>? metadata;
const BaseEvent({
required this.type,
required this.timestamp,
this.metadata,
});
}Lifecycle Events
Track the execution lifecycle of agent runs and steps.
RunStartedEvent
Signals the beginning of an agent run:
class RunStartedEvent extends BaseEvent {
final String runId;
final String? threadId;
final Map<String, dynamic>? input;
const RunStartedEvent({
required this.runId,
this.threadId,
this.input,
DateTime? timestamp,
});
}RunFinishedEvent
Signals the completion of an agent run:
class RunFinishedEvent extends BaseEvent {
final String runId;
final String? error;
final Map<String, dynamic>? output;
final Duration? duration;
const RunFinishedEvent({
required this.runId,
this.error,
this.output,
this.duration,
DateTime? timestamp,
});
}StepStartedEvent
Marks the beginning of a processing step:
class StepStartedEvent extends BaseEvent {
final String stepId;
final String runId;
final String stepType;
final String? parentStepId;
const StepStartedEvent({
required this.stepId,
required this.runId,
required this.stepType,
this.parentStepId,
DateTime? timestamp,
});
}StepFinishedEvent
Marks the completion of a processing step:
class StepFinishedEvent extends BaseEvent {
final String stepId;
final String runId;
final String? error;
final Map<String, dynamic>? output;
const StepFinishedEvent({
required this.stepId,
required this.runId,
this.error,
this.output,
DateTime? timestamp,
});
}Example Usage
await for (final event in client.runAgent('agent', input)) {
switch (event) {
case RunStartedEvent(:final runId):
print('Started run: $runId');
startTimer();
case StepStartedEvent(:final stepType):
print('Processing: $stepType');
case StepFinishedEvent(:final error):
if (error != null) {
print('Step failed: $error');
}
case RunFinishedEvent(:final duration):
print('Completed in ${duration?.inSeconds}s');
stopTimer();
}
}Text Message Events
Handle streaming text responses from the assistant.
TextMessageStartedEvent
Indicates the start of a text message:
class TextMessageStartedEvent extends BaseEvent {
final String messageId;
final String? role;
final Map<String, dynamic>? metadata;
const TextMessageStartedEvent({
required this.messageId,
this.role,
this.metadata,
DateTime? timestamp,
});
}TextMessageDeltaEvent
Streams incremental text content:
class TextMessageDeltaEvent extends BaseEvent {
final String messageId;
final String delta;
final int? position;
const TextMessageDeltaEvent({
required this.messageId,
required this.delta,
this.position,
DateTime? timestamp,
});
}TextMessageFinishedEvent
Signals message completion:
class TextMessageFinishedEvent extends BaseEvent {
final String messageId;
final String fullText;
final int tokenCount;
const TextMessageFinishedEvent({
required this.messageId,
required this.fullText,
required this.tokenCount,
DateTime? timestamp,
});
}Example: Streaming Text
final buffer = StringBuffer();
String? currentMessageId;
await for (final event in stream) {
switch (event) {
case TextMessageStartedEvent(:final messageId):
currentMessageId = messageId;
buffer.clear();
showTypingIndicator();
case TextMessageDeltaEvent(:final delta, :final messageId):
if (messageId == currentMessageId) {
buffer.write(delta);
updateUI(buffer.toString());
}
case TextMessageFinishedEvent(:final fullText):
hideTypingIndicator();
finalizeMessage(fullText);
}
}Tool Call Events
Track tool/function invocations by the agent.
ToolCallStartedEvent
Signals the start of a tool call:
class ToolCallStartedEvent extends BaseEvent {
final String toolCallId;
final String name;
final Map<String, dynamic> arguments;
const ToolCallStartedEvent({
required this.toolCallId,
required this.name,
required this.arguments,
DateTime? timestamp,
});
}ToolCallProgressEvent
Reports progress during tool execution:
class ToolCallProgressEvent extends BaseEvent {
final String toolCallId;
final double progress; // 0.0 to 1.0
final String? message;
const ToolCallProgressEvent({
required this.toolCallId,
required this.progress,
this.message,
DateTime? timestamp,
});
}ToolCallFinishedEvent
Signals tool call completion:
class ToolCallFinishedEvent extends BaseEvent {
final String toolCallId;
final dynamic result;
final String? error;
final Duration? duration;
const ToolCallFinishedEvent({
required this.toolCallId,
required this.result,
this.error,
this.duration,
DateTime? timestamp,
});
}Example: Tool Tracking
final activeTools = <String, ToolCallInfo>{};
await for (final event in stream) {
switch (event) {
case ToolCallStartedEvent(:final toolCallId, :final name):
activeTools[toolCallId] = ToolCallInfo(name: name);
print('⚡ Calling $name...');
case ToolCallProgressEvent(:final toolCallId, :final progress):
final percentage = (progress * 100).toStringAsFixed(0);
print(' Progress: $percentage%');
case ToolCallFinishedEvent(:final toolCallId, :final result, :final error):
final tool = activeTools.remove(toolCallId);
if (error != null) {
print('❌ ${tool?.name} failed: $error');
} else {
print('✅ ${tool?.name} completed');
}
}
}State Management Events
Handle agent state updates and synchronization.
StateSnapshotEvent
Provides a complete state snapshot:
class StateSnapshotEvent extends BaseEvent {
final Map<String, dynamic> state;
final String? checkpointId;
const StateSnapshotEvent({
required this.state,
this.checkpointId,
DateTime? timestamp,
});
}StateDeltaEvent
Provides incremental state updates using JSON Patch:
class StateDeltaEvent extends BaseEvent {
final List<JsonPatch> patches;
final String? checkpointId;
const StateDeltaEvent({
required this.patches,
this.checkpointId,
DateTime? timestamp,
});
}
class JsonPatch {
final String op; // 'add', 'remove', 'replace', 'move', 'copy', 'test'
final String path;
final dynamic value;
final String? from;
const JsonPatch({
required this.op,
required this.path,
this.value,
this.from,
});
}MessagesSnapshotEvent
Provides conversation history:
class MessagesSnapshotEvent extends BaseEvent {
final List<Message> messages;
final String? threadId;
const MessagesSnapshotEvent({
required this.messages,
this.threadId,
DateTime? timestamp,
});
}Example: State Management
var currentState = <String, dynamic>{};
final messages = <Message>[];
await for (final event in stream) {
switch (event) {
case StateSnapshotEvent(:final state):
currentState = Map.from(state);
updateStateUI(currentState);
case StateDeltaEvent(:final patches):
for (final patch in patches) {
currentState = applyPatch(currentState, patch);
}
updateStateUI(currentState);
case MessagesSnapshotEvent(:final messages):
this.messages
..clear()
..addAll(messages);
updateConversationUI(this.messages);
}
}Special Events
Handle raw data and custom event types.
RawEvent
For custom or unrecognized events:
class RawEvent extends BaseEvent {
final String eventType;
final Map<String, dynamic> data;
const RawEvent({
required this.eventType,
required this.data,
DateTime? timestamp,
});
}ErrorEvent
For error notifications:
class ErrorEvent extends BaseEvent {
final String code;
final String message;
final Map<String, dynamic>? details;
const ErrorEvent({
required this.code,
required this.message,
this.details,
DateTime? timestamp,
});
}MetadataEvent
For metadata updates:
class MetadataEvent extends BaseEvent {
final Map<String, dynamic> metadata;
final String? scope;
const MetadataEvent({
required this.metadata,
this.scope,
DateTime? timestamp,
});
}Event Handling Patterns
Complete Handler
Handle all event types comprehensively:
class EventHandler {
void handleEvent(BaseEvent event) {
switch (event) {
// Lifecycle
case RunStartedEvent():
onRunStarted(event);
case RunFinishedEvent():
onRunFinished(event);
case StepStartedEvent():
onStepStarted(event);
case StepFinishedEvent():
onStepFinished(event);
// Messages
case TextMessageStartedEvent():
onMessageStarted(event);
case TextMessageDeltaEvent():
onMessageDelta(event);
case TextMessageFinishedEvent():
onMessageFinished(event);
// Tool calls
case ToolCallStartedEvent():
onToolCallStarted(event);
case ToolCallProgressEvent():
onToolCallProgress(event);
case ToolCallFinishedEvent():
onToolCallFinished(event);
// State
case StateSnapshotEvent():
onStateSnapshot(event);
case StateDeltaEvent():
onStateDelta(event);
case MessagesSnapshotEvent():
onMessagesSnapshot(event);
// Special
case ErrorEvent():
onError(event);
case RawEvent():
onRawEvent(event);
}
}
}Stream Transformations
Transform event streams for specific use cases:
extension EventStreamExtensions on Stream<BaseEvent> {
/// Extract only text content
Stream<String> get textContent =>
whereType<TextMessageDeltaEvent>()
.map((e) => e.delta);
/// Get completed messages
Stream<String> get completedMessages =>
whereType<TextMessageFinishedEvent>()
.map((e) => e.fullText);
/// Track tool calls
Stream<ToolCallResult> get toolResults =>
whereType<ToolCallFinishedEvent>()
.map((e) => ToolCallResult(
id: e.toolCallId,
result: e.result,
error: e.error,
));
/// Filter errors
Stream<ErrorEvent> get errors =>
whereType<ErrorEvent>();
}
// Usage
final textStream = eventStream.textContent;
final errors = eventStream.errors;Event Aggregation
Collect related events:
class MessageAggregator {
final _messages = <String, StringBuffer>{};
void processEvent(BaseEvent event) {
switch (event) {
case TextMessageStartedEvent(:final messageId):
_messages[messageId] = StringBuffer();
case TextMessageDeltaEvent(:final messageId, :final delta):
_messages[messageId]?.write(delta);
case TextMessageFinishedEvent(:final messageId):
final content = _messages.remove(messageId)?.toString();
if (content != null) {
onCompleteMessage(messageId, content);
}
}
}
void onCompleteMessage(String id, String content) {
// Handle complete message
}
}Testing Events
Create mock events for testing:
// Test event factory
class TestEvents {
static RunStartedEvent runStarted([String? runId]) =>
RunStartedEvent(
runId: runId ?? 'test-run',
timestamp: DateTime.now(),
);
static TextMessageDeltaEvent textDelta(String text, [String? messageId]) =>
TextMessageDeltaEvent(
messageId: messageId ?? 'test-msg',
delta: text,
timestamp: DateTime.now(),
);
static Stream<BaseEvent> mockStream() async* {
yield runStarted();
yield TextMessageStartedEvent(messageId: 'msg1');
for (final word in 'Hello world'.split(' ')) {
yield textDelta('$word ');
await Future.delayed(Duration(milliseconds: 100));
}
yield TextMessageFinishedEvent(
messageId: 'msg1',
fullText: 'Hello world',
tokenCount: 2,
);
yield RunFinishedEvent(runId: 'test-run');
}
}
// Test usage
test('handles text streaming', () async {
final events = await TestEvents.mockStream().toList();
expect(events.length, equals(6));
expect(events.first, isA<RunStartedEvent>());
expect(events.last, isA<RunFinishedEvent>());
});