Encoding Overview
The encoding package provides a comprehensive system for encoding, decoding, and content negotiation in the AG-UI Go SDK. It supports multiple formats, streaming operations, and intelligent content type selection based on client preferences.
Package Overview
The encoding system is built around a set of well-defined interfaces that follow the Interface Segregation Principle, allowing components to implement only the functionality they need.
Core Interfaces
Encoder Interface
The Encoder interface defines methods for encoding events to bytes:
type Encoder interface {
// Encode encodes a single event
Encode(ctx context.Context, event events.Event) ([]byte, error)
// EncodeMultiple encodes multiple events efficiently
EncodeMultiple(ctx context.Context, events []events.Event) ([]byte, error)
// ContentType returns the MIME type for this encoder
ContentType() string
}Decoder Interface
The Decoder interface defines methods for decoding events from bytes:
type Decoder interface {
// Decode decodes a single event from raw data
Decode(ctx context.Context, data []byte) (events.Event, error)
// DecodeMultiple decodes multiple events from raw data
DecodeMultiple(ctx context.Context, data []byte) ([]events.Event, error)
// ContentType returns the MIME type for this decoder
ContentType() string
}Codec Interface
The Codec interface combines encoding and decoding capabilities:
type Codec interface {
Encoder
Decoder
ContentTypeProvider
StreamingCapabilityProvider
}JSON Encoding
The JSON encoder provides high-performance JSON serialization with support for cross-SDK compatibility:
// Create a JSON encoder with options
encoder := json.NewJSONEncoder(&encoding.EncodingOptions{
Pretty: true, // Format output for readability
CrossSDKCompatibility: true, // Ensure compatibility with other SDKs
ValidateOutput: true, // Validate events before encoding
})
// Encode an event
data, err := encoder.Encode(ctx, event)
if err != nil {
// Handle encoding error
}
// The encoder returns "application/json" as its content type
contentType := encoder.ContentType()SSE Writer
The SSE (Server-Sent Events) writer is used for streaming events to clients in real-time:
// Create an SSE writer
writer := sse.NewSSEWriter()
// Write an event to an HTTP response writer
err := writer.WriteEvent(ctx, w, event)
if err != nil {
// Handle write error
}
// Write an error event
err = writer.WriteErrorEvent(ctx, w, errors.New("something went wrong"), "request-123")
// Flush the writer to ensure data is sent immediately
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}Server Implementation Example
Here's a complete example of using the SSE writer in an HTTP endpoint:
func handleAgentStream(w http.ResponseWriter, r *http.Request) {
// Set headers for SSE
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// Create SSE writer
sseWriter := sse.NewSSEWriter()
// Send events
events := generateEvents() // Your event generation logic
for _, event := range events {
if err := sseWriter.WriteEvent(r.Context(), w, event); err != nil {
log.Printf("Failed to write event: %v", err)
return
}
// Flush after each event for real-time streaming
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
}Content Negotiation
The negotiation package implements RFC 7231 compliant content type negotiation:
// Create a negotiator with preferred content type
negotiator := negotiation.NewContentNegotiator("application/json")
// Negotiate based on Accept header
acceptHeader := r.Header.Get("Accept")
contentType, err := negotiator.Negotiate(acceptHeader)
if err != nil {
// No acceptable content type found
contentType = "application/json" // Fall back to default
}
// Check if a specific content type is supported
if negotiator.CanHandle("application/protobuf") {
// Use protobuf encoding
}
// Get list of supported types
supportedTypes := negotiator.SupportedTypes()Custom Content Types
You can register custom content types with their capabilities:
negotiator.RegisterType(&negotiation.TypeCapabilities{
ContentType: "application/vnd.myapp+json",
CanStream: true,
CompressionSupport: []string{"gzip", "deflate"},
Priority: 0.95,
Extensions: []string{".myapp.json"},
Aliases: []string{"application/x-myapp"},
})Streaming Operations
For handling large volumes of events, the encoding package supports streaming operations:
type StreamEncoder interface {
// EncodeStream encodes events from a channel to a writer
EncodeStream(ctx context.Context, input <-chan events.Event, output io.Writer) error
// Session management
StartStream(ctx context.Context, w io.Writer) error
EndStream(ctx context.Context) error
// Event processing
WriteEvent(ctx context.Context, event events.Event) error
}Channel-Based Streaming
Stream events from a channel directly to an output writer:
// Create event channel
eventChan := make(chan events.Event, 100)
// Start streaming in a goroutine
go func() {
err := streamEncoder.EncodeStream(ctx, eventChan, w)
if err != nil {
log.Printf("Streaming error: %v", err)
}
}()
// Send events to the channel
for _, event := range events {
select {
case eventChan <- event:
// Event sent
case <-ctx.Done():
// Context cancelled
return
}
}
close(eventChan)Configuration Options
The encoding package provides comprehensive configuration options:
// EncodingOptions for encoding operations
type EncodingOptions struct {
Pretty bool // Format output for readability
Compression string // Compression algorithm (e.g., "gzip")
BufferSize int // Buffer size for streaming
MaxSize int64 // Maximum encoded size (0 for unlimited)
ValidateOutput bool // Validate output after encoding
CrossSDKCompatibility bool // Ensure compatibility with other SDKs
}
// DecodingOptions for decoding operations
type DecodingOptions struct {
Strict bool // Enable strict validation
MaxSize int64 // Maximum input size (0 for unlimited)
BufferSize int // Buffer size for streaming
AllowUnknownFields bool // Allow unknown fields in input
ValidateEvents bool // Validate events after decoding
}Error Handling
The encoding package defines specific error types for better error handling:
// Handle encoding errors
data, err := encoder.Encode(ctx, event)
if err != nil {
var encErr *encoding.EncodingError
if errors.As(err, &encErr) {
log.Printf("Encoding failed for format %s: %s", encErr.Format, encErr.Message)
if encErr.Cause != nil {
log.Printf("Underlying error: %v", encErr.Cause)
}
}
}
// Handle decoding errors
event, err := decoder.Decode(ctx, data)
if err != nil {
var decErr *encoding.DecodingError
if errors.As(err, &decErr) {
log.Printf("Decoding failed for format %s: %s", decErr.Format, decErr.Message)
}
}Examples
Complete Server Endpoint
Here's a complete example of an AG-UI agent endpoint with content negotiation and SSE streaming:
func handleAgent(w http.ResponseWriter, r *http.Request) {
// Content negotiation
negotiator := negotiation.NewContentNegotiator("application/json")
acceptHeader := r.Header.Get("Accept")
contentType, _ := negotiator.Negotiate(acceptHeader)
// For SSE streaming
if contentType == "text/event-stream" || r.Header.Get("Accept") == "text/event-stream" {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
sseWriter := sse.NewSSEWriter()
// Process request and generate events
events := processAgentRequest(r)
for _, event := range events {
if err := sseWriter.WriteEvent(r.Context(), w, event); err != nil {
sseWriter.WriteErrorEvent(r.Context(), w, err, "req-123")
return
}
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
return
}
// For JSON response
w.Header().Set("Content-Type", "application/json")
encoder := json.NewJSONEncoder(&encoding.EncodingOptions{
Pretty: r.URL.Query().Get("pretty") == "true",
})
events := processAgentRequest(r)
data, err := encoder.EncodeMultiple(r.Context(), events)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write(data)
}Custom Codec Implementation
Implement a custom codec for a specific format:
type CustomCodec struct {
options *encoding.EncodingOptions
}
func (c *CustomCodec) Encode(ctx context.Context, event events.Event) ([]byte, error) {
// Custom encoding logic
return customSerialize(event)
}
func (c *CustomCodec) Decode(ctx context.Context, data []byte) (events.Event, error) {
// Custom decoding logic
return customDeserialize(data)
}
func (c *CustomCodec) ContentType() string {
return "application/vnd.custom+binary"
}
func (c *CustomCodec) SupportsStreaming() bool {
return true
}