CopilotKit

Events

The AG-UI protocol uses a comprehensive event system for agent-UI communication. All events extend from BaseEvent and are strongly typed for compile-time safety.

BaseEvent

The base class for all protocol events:

sealed class BaseEvent {
  final String type;
  final DateTime timestamp;
  final Map<String, dynamic>? metadata;

  const BaseEvent({
    required this.type,
    required this.timestamp,
    this.metadata,
  });
}

Lifecycle Events

Track the execution lifecycle of agent runs and steps.

RunStartedEvent

Signals the beginning of an agent run:

class RunStartedEvent extends BaseEvent {
  final String runId;
  final String? threadId;
  final Map<String, dynamic>? input;

  const RunStartedEvent({
    required this.runId,
    this.threadId,
    this.input,
    DateTime? timestamp,
  });
}

RunFinishedEvent

Signals the completion of an agent run:

class RunFinishedEvent extends BaseEvent {
  final String runId;
  final String? error;
  final Map<String, dynamic>? output;
  final Duration? duration;

  const RunFinishedEvent({
    required this.runId,
    this.error,
    this.output,
    this.duration,
    DateTime? timestamp,
  });
}

StepStartedEvent

Marks the beginning of a processing step:

class StepStartedEvent extends BaseEvent {
  final String stepId;
  final String runId;
  final String stepType;
  final String? parentStepId;

  const StepStartedEvent({
    required this.stepId,
    required this.runId,
    required this.stepType,
    this.parentStepId,
    DateTime? timestamp,
  });
}

StepFinishedEvent

Marks the completion of a processing step:

class StepFinishedEvent extends BaseEvent {
  final String stepId;
  final String runId;
  final String? error;
  final Map<String, dynamic>? output;

  const StepFinishedEvent({
    required this.stepId,
    required this.runId,
    this.error,
    this.output,
    DateTime? timestamp,
  });
}

Example Usage

await for (final event in client.runAgent('agent', input)) {
  switch (event) {
    case RunStartedEvent(:final runId):
      print('Started run: $runId');
      startTimer();

    case StepStartedEvent(:final stepType):
      print('Processing: $stepType');

    case StepFinishedEvent(:final error):
      if (error != null) {
        print('Step failed: $error');
      }

    case RunFinishedEvent(:final duration):
      print('Completed in ${duration?.inSeconds}s');
      stopTimer();
  }
}

Text Message Events

Handle streaming text responses from the assistant.

TextMessageStartedEvent

Indicates the start of a text message:

class TextMessageStartedEvent extends BaseEvent {
  final String messageId;
  final String? role;
  final Map<String, dynamic>? metadata;

  const TextMessageStartedEvent({
    required this.messageId,
    this.role,
    this.metadata,
    DateTime? timestamp,
  });
}

TextMessageDeltaEvent

Streams incremental text content:

class TextMessageDeltaEvent extends BaseEvent {
  final String messageId;
  final String delta;
  final int? position;

  const TextMessageDeltaEvent({
    required this.messageId,
    required this.delta,
    this.position,
    DateTime? timestamp,
  });
}

TextMessageFinishedEvent

Signals message completion:

class TextMessageFinishedEvent extends BaseEvent {
  final String messageId;
  final String fullText;
  final int tokenCount;

  const TextMessageFinishedEvent({
    required this.messageId,
    required this.fullText,
    required this.tokenCount,
    DateTime? timestamp,
  });
}

Example: Streaming Text

final buffer = StringBuffer();
String? currentMessageId;

await for (final event in stream) {
  switch (event) {
    case TextMessageStartedEvent(:final messageId):
      currentMessageId = messageId;
      buffer.clear();
      showTypingIndicator();

    case TextMessageDeltaEvent(:final delta, :final messageId):
      if (messageId == currentMessageId) {
        buffer.write(delta);
        updateUI(buffer.toString());
      }

    case TextMessageFinishedEvent(:final fullText):
      hideTypingIndicator();
      finalizeMessage(fullText);
  }
}

Tool Call Events

Track tool/function invocations by the agent.

ToolCallStartedEvent

Signals the start of a tool call:

class ToolCallStartedEvent extends BaseEvent {
  final String toolCallId;
  final String name;
  final Map<String, dynamic> arguments;

  const ToolCallStartedEvent({
    required this.toolCallId,
    required this.name,
    required this.arguments,
    DateTime? timestamp,
  });
}

ToolCallProgressEvent

Reports progress during tool execution:

class ToolCallProgressEvent extends BaseEvent {
  final String toolCallId;
  final double progress;  // 0.0 to 1.0
  final String? message;

  const ToolCallProgressEvent({
    required this.toolCallId,
    required this.progress,
    this.message,
    DateTime? timestamp,
  });
}

ToolCallFinishedEvent

Signals tool call completion:

class ToolCallFinishedEvent extends BaseEvent {
  final String toolCallId;
  final dynamic result;
  final String? error;
  final Duration? duration;

  const ToolCallFinishedEvent({
    required this.toolCallId,
    required this.result,
    this.error,
    this.duration,
    DateTime? timestamp,
  });
}

Example: Tool Tracking

final activeTools = <String, ToolCallInfo>{};

await for (final event in stream) {
  switch (event) {
    case ToolCallStartedEvent(:final toolCallId, :final name):
      activeTools[toolCallId] = ToolCallInfo(name: name);
      print('⚡ Calling $name...');

    case ToolCallProgressEvent(:final toolCallId, :final progress):
      final percentage = (progress * 100).toStringAsFixed(0);
      print('  Progress: $percentage%');

    case ToolCallFinishedEvent(:final toolCallId, :final result, :final error):
      final tool = activeTools.remove(toolCallId);
      if (error != null) {
        print('❌ ${tool?.name} failed: $error');
      } else {
        print('✅ ${tool?.name} completed');
      }
  }
}

State Management Events

Handle agent state updates and synchronization.

StateSnapshotEvent

Provides a complete state snapshot:

class StateSnapshotEvent extends BaseEvent {
  final Map<String, dynamic> state;
  final String? checkpointId;

  const StateSnapshotEvent({
    required this.state,
    this.checkpointId,
    DateTime? timestamp,
  });
}

StateDeltaEvent

Provides incremental state updates using JSON Patch:

class StateDeltaEvent extends BaseEvent {
  final List<JsonPatch> patches;
  final String? checkpointId;

  const StateDeltaEvent({
    required this.patches,
    this.checkpointId,
    DateTime? timestamp,
  });
}

class JsonPatch {
  final String op;  // 'add', 'remove', 'replace', 'move', 'copy', 'test'
  final String path;
  final dynamic value;
  final String? from;

  const JsonPatch({
    required this.op,
    required this.path,
    this.value,
    this.from,
  });
}

MessagesSnapshotEvent

Provides conversation history:

class MessagesSnapshotEvent extends BaseEvent {
  final List<Message> messages;
  final String? threadId;

  const MessagesSnapshotEvent({
    required this.messages,
    this.threadId,
    DateTime? timestamp,
  });
}

Example: State Management

var currentState = <String, dynamic>{};
final messages = <Message>[];

await for (final event in stream) {
  switch (event) {
    case StateSnapshotEvent(:final state):
      currentState = Map.from(state);
      updateStateUI(currentState);

    case StateDeltaEvent(:final patches):
      for (final patch in patches) {
        currentState = applyPatch(currentState, patch);
      }
      updateStateUI(currentState);

    case MessagesSnapshotEvent(:final messages):
      this.messages
        ..clear()
        ..addAll(messages);
      updateConversationUI(this.messages);
  }
}

Special Events

Handle raw data and custom event types.

RawEvent

For custom or unrecognized events:

class RawEvent extends BaseEvent {
  final String eventType;
  final Map<String, dynamic> data;

  const RawEvent({
    required this.eventType,
    required this.data,
    DateTime? timestamp,
  });
}

ErrorEvent

For error notifications:

class ErrorEvent extends BaseEvent {
  final String code;
  final String message;
  final Map<String, dynamic>? details;

  const ErrorEvent({
    required this.code,
    required this.message,
    this.details,
    DateTime? timestamp,
  });
}

MetadataEvent

For metadata updates:

class MetadataEvent extends BaseEvent {
  final Map<String, dynamic> metadata;
  final String? scope;

  const MetadataEvent({
    required this.metadata,
    this.scope,
    DateTime? timestamp,
  });
}

Event Handling Patterns

Complete Handler

Handle all event types comprehensively:

class EventHandler {
  void handleEvent(BaseEvent event) {
    switch (event) {
      // Lifecycle
      case RunStartedEvent():
        onRunStarted(event);
      case RunFinishedEvent():
        onRunFinished(event);
      case StepStartedEvent():
        onStepStarted(event);
      case StepFinishedEvent():
        onStepFinished(event);

      // Messages
      case TextMessageStartedEvent():
        onMessageStarted(event);
      case TextMessageDeltaEvent():
        onMessageDelta(event);
      case TextMessageFinishedEvent():
        onMessageFinished(event);

      // Tool calls
      case ToolCallStartedEvent():
        onToolCallStarted(event);
      case ToolCallProgressEvent():
        onToolCallProgress(event);
      case ToolCallFinishedEvent():
        onToolCallFinished(event);

      // State
      case StateSnapshotEvent():
        onStateSnapshot(event);
      case StateDeltaEvent():
        onStateDelta(event);
      case MessagesSnapshotEvent():
        onMessagesSnapshot(event);

      // Special
      case ErrorEvent():
        onError(event);
      case RawEvent():
        onRawEvent(event);
    }
  }
}

Stream Transformations

Transform event streams for specific use cases:

extension EventStreamExtensions on Stream<BaseEvent> {
  /// Extract only text content
  Stream<String> get textContent =>
      whereType<TextMessageDeltaEvent>()
      .map((e) => e.delta);

  /// Get completed messages
  Stream<String> get completedMessages =>
      whereType<TextMessageFinishedEvent>()
      .map((e) => e.fullText);

  /// Track tool calls
  Stream<ToolCallResult> get toolResults =>
      whereType<ToolCallFinishedEvent>()
      .map((e) => ToolCallResult(
        id: e.toolCallId,
        result: e.result,
        error: e.error,
      ));

  /// Filter errors
  Stream<ErrorEvent> get errors =>
      whereType<ErrorEvent>();
}

// Usage
final textStream = eventStream.textContent;
final errors = eventStream.errors;

Event Aggregation

Collect related events:

class MessageAggregator {
  final _messages = <String, StringBuffer>{};

  void processEvent(BaseEvent event) {
    switch (event) {
      case TextMessageStartedEvent(:final messageId):
        _messages[messageId] = StringBuffer();

      case TextMessageDeltaEvent(:final messageId, :final delta):
        _messages[messageId]?.write(delta);

      case TextMessageFinishedEvent(:final messageId):
        final content = _messages.remove(messageId)?.toString();
        if (content != null) {
          onCompleteMessage(messageId, content);
        }
    }
  }

  void onCompleteMessage(String id, String content) {
    // Handle complete message
  }
}

Testing Events

Create mock events for testing:

// Test event factory
class TestEvents {
  static RunStartedEvent runStarted([String? runId]) =>
      RunStartedEvent(
        runId: runId ?? 'test-run',
        timestamp: DateTime.now(),
      );

  static TextMessageDeltaEvent textDelta(String text, [String? messageId]) =>
      TextMessageDeltaEvent(
        messageId: messageId ?? 'test-msg',
        delta: text,
        timestamp: DateTime.now(),
      );

  static Stream<BaseEvent> mockStream() async* {
    yield runStarted();
    yield TextMessageStartedEvent(messageId: 'msg1');

    for (final word in 'Hello world'.split(' ')) {
      yield textDelta('$word ');
      await Future.delayed(Duration(milliseconds: 100));
    }

    yield TextMessageFinishedEvent(
      messageId: 'msg1',
      fullText: 'Hello world',
      tokenCount: 2,
    );
    yield RunFinishedEvent(runId: 'test-run');
  }
}

// Test usage
test('handles text streaming', () async {
  final events = await TestEvents.mockStream().toList();
  expect(events.length, equals(6));
  expect(events.first, isA<RunStartedEvent>());
  expect(events.last, isA<RunFinishedEvent>());
});