CopilotKit

AbstractAgent

AbstractAgent is the base class that provides the foundation for implementing custom agent connectivity patterns. It defines the core contract and common functionality that all agent implementations must follow, making it the starting point for building specialized agent types.

Overview

AbstractAgent provides:

  • Core HTTP client configuration and management
  • Authentication handling
  • Request/response lifecycle hooks
  • Common utility methods
  • Standardized error handling patterns

Usage

Creating Custom Agent Types

class CustomAgent(
    url: String,
    configure: AgUiAgentConfig.() -> Unit = {}
) : AbstractAgent(url, configure) {

    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return flow {
            // Custom pre-processing
            val processedInput = preprocessInput(input)

            // Execute request using inherited HTTP client
            httpClient.post(agentUrl) {
                contentType(ContentType.Application.Json)
                setBody(processedInput)
            }.body<Flow<BaseEvent>>().collect { event ->
                // Custom event processing
                emit(processEvent(event))
            }
        }
    }

    private fun preprocessInput(input: RunAgentInput): RunAgentInput {
        // Custom input processing logic
        return input.copy(
            context = input.context + mapOf("customFlag" to "true")
        )
    }

    private fun processEvent(event: BaseEvent): BaseEvent {
        // Custom event processing logic
        return when (event) {
            is TextMessageContentEvent -> {
                // Transform content
                event.copy(delta = event.delta.uppercase())
            }
            else -> event
        }
    }
}

Specialized Agent Implementation

class BatchAgent(
    url: String,
    configure: AgUiAgentConfig.() -> Unit = {}
) : AbstractAgent(url, configure) {

    private val messageQueue = mutableListOf<String>()

    fun queueMessage(message: String) {
        messageQueue.add(message)
    }

    fun processBatch(threadId: String = "batch"): Flow<BaseEvent> {
        val messages = messageQueue.map { content ->
            UserMessage(id = generateId("user"), content = content)
        }

        val input = RunAgentInput(
            threadId = threadId,
            runId = generateRunId(),
            messages = messages
        )

        messageQueue.clear()
        return run(input)
    }

    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        // Custom batching logic
        return super.run(input.copy(
            context = input.context + mapOf(
                "batchSize" to input.messages.size.toString(),
                "batchId" to UUID.randomUUID().toString()
            )
        ))
    }
}

Configuration

AbstractAgent uses AgUiAgentConfig for configuration:

abstract class MyAgent(
    url: String,
    configure: AgUiAgentConfig.() -> Unit
) : AbstractAgent(url, configure) {

    init {
        // Access configuration through inherited 'config' property
        println("System prompt: ${config.systemPrompt}")
        println("Debug mode: ${config.debug}")
        println("Headers: ${config.headers}")
    }
}

Core Methods

Abstract Methods

run

Must be implemented by subclasses to define request execution:

abstract fun run(input: RunAgentInput): Flow<BaseEvent>

Parameters:

  • input: Complete AG-UI protocol input

Returns: Flow<BaseEvent> - Stream of protocol events

Protected Properties

httpClient

Pre-configured HTTP client with authentication:

class MyAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return flow {
            // Use inherited HTTP client
            val response = httpClient.post(agentUrl) {
                contentType(ContentType.Application.Json)
                setBody(input)
            }
            // Process response...
        }
    }
}

config

Access to agent configuration:

class MyAgent : AbstractAgent(url, config) {
    private fun customizeRequest(): HttpRequestBuilder.() -> Unit = {
        // Use config properties
        timeout {
            requestTimeoutMillis = config.requestTimeout.inWholeMilliseconds
            connectTimeoutMillis = config.connectTimeout.inWholeMilliseconds
        }

        // Add custom headers from config
        config.headers.forEach { (key, value) ->
            header(key, value)
        }
    }
}

agentUrl

The configured agent endpoint URL:

class MyAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return flow {
            println("Connecting to: $agentUrl")
            // Make request to agentUrl
        }
    }
}

Utility Methods

generateId

Generate unique IDs for messages and runs:

class MyAgent : AbstractAgent(url, config) {
    private fun createMessage(content: String): UserMessage {
        return UserMessage(
            id = generateId("user"), // Inherited utility
            content = content
        )
    }
}

generateRunId

Generate unique run identifiers:

class MyAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        val runId = input.runId ?: generateRunId() // Inherited utility
        // Use runId for request tracking
        return processRequest(input.copy(runId = runId))
    }
}

HTTP Client Configuration

AbstractAgent automatically configures the HTTP client with:

Authentication

Based on config, sets up:

  • Bearer token authentication
  • API key authentication
  • Basic authentication
  • Custom authentication providers

Platform-Specific Engines

  • Android: ktor-client-android
  • iOS: ktor-client-darwin
  • JVM: ktor-client-cio

Content Negotiation

  • JSON serialization with kotlinx.serialization
  • Automatic request/response handling

Logging

Optional request/response logging when config.debug = true

Common Implementation Patterns

Request Preprocessing

class PreprocessingAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return flow {
            // Add metadata to all requests
            val enhancedInput = input.copy(
                context = input.context + mapOf(
                    "clientVersion" to "1.0.0",
                    "requestTime" to System.currentTimeMillis().toString()
                )
            )

            // Execute with enhanced input
            processRequest(enhancedInput).collect { emit(it) }
        }
    }
}

Event Filtering

class FilteringAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return processRequest(input)
            .filter { event ->
                // Only emit certain event types
                when (event) {
                    is TextMessageContentEvent,
                    is ToolCallStartEvent,
                    is RunFinishedEvent -> true
                    else -> false
                }
            }
    }
}

Response Transformation

class TransformingAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return processRequest(input)
            .map { event ->
                // Transform events before emission
                when (event) {
                    is TextMessageContentEvent -> {
                        event.copy(delta = formatContent(event.delta))
                    }
                    else -> event
                }
            }
    }

    private fun formatContent(content: String): String {
        // Custom content formatting
        return content.trim().replace("\\n", "\n")
    }
}

Error Handling

class RobustAgent : AbstractAgent(url, config) {
    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return flow {
            try {
                processRequest(input).collect { event ->
                    emit(event)
                }
            } catch (e: Exception) {
                // Convert exceptions to protocol errors
                emit(ErrorEvent(
                    error = AgentError(
                        type = "custom_error",
                        message = e.message ?: "Unknown error",
                        details = mapOf("exception" to e::class.simpleName)
                    )
                ))
            }
        }
    }
}

Best Practices

Configuration Validation

class ValidatingAgent : AbstractAgent(url, config) {
    init {
        // Validate configuration
        require(config.systemPrompt?.isNotBlank() == true) {
            "System prompt is required"
        }

        require(config.bearerToken?.isNotBlank() == true ||
               config.apiKey?.isNotBlank() == true) {
            "Authentication is required"
        }
    }
}

Resource Management

class ResourceAwareAgent : AbstractAgent(url, config) {
    private val requestCounter = AtomicInteger(0)

    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        val requestId = requestCounter.incrementAndGet()

        return flow {
            try {
                if (config.debug) {
                    println("Starting request $requestId")
                }

                processRequest(input).collect { event ->
                    emit(event)
                }
            } finally {
                if (config.debug) {
                    println("Completed request $requestId")
                }
            }
        }
    }
}

Thread Safety

class ThreadSafeAgent : AbstractAgent(url, config) {
    private val activeRequests = ConcurrentHashMap<String, Job>()

    override fun run(input: RunAgentInput): Flow<BaseEvent> {
        return flow {
            val requestKey = "${input.threadId}-${input.runId}"

            // Track active request
            activeRequests[requestKey] = currentCoroutineContext().job

            try {
                processRequest(input).collect { event ->
                    emit(event)
                }
            } finally {
                activeRequests.remove(requestKey)
            }
        }
    }

    fun cancelRequest(threadId: String, runId: String) {
        val requestKey = "$threadId-$runId"
        activeRequests[requestKey]?.cancel()
    }
}