Error Handling
Herald routes all operational errors through capitan's event system, enabling centralized error handling and observability.
Error Signal
All herald errors are emitted on herald.ErrorSignal:
capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
err, _ := herald.ErrorKey.From(e)
log.Printf("[herald] %s: %v", err.Operation, err.Err)
})
Error Structure
type Error struct {
Operation string // "publish", "subscribe", "unmarshal", "ack", "nack"
Signal string // The signal name involved
Err string // The error message (string for JSON serialization)
Nack bool // Whether the message was nack'd
Raw []byte // Raw message data (for unmarshal errors)
}
Note: Err is a string rather than error to support JSON serialization of error events.
Error Types
Publish Errors
Occur when publishing to a broker fails:
capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
err, _ := herald.ErrorKey.From(e)
if err.Operation == "publish" {
log.Printf("Failed to publish to %s: %v", err.Signal, err.Err)
// Retry logic, alerting, etc.
}
})
Common causes:
- Broker unavailable
- Network timeout
- Authentication failure
- Topic/queue doesn't exist
Subscribe Errors
Occur when consuming from a broker fails:
if err.Operation == "subscribe" {
log.Printf("Subscription error on %s: %v", err.Signal, err.Err)
}
Common causes:
- Connection lost
- Consumer group rebalance
- Permission denied
Unmarshal Errors
Occur when deserializing a message fails:
if err.Operation == "unmarshal" {
log.Printf("Deserialization failed: %v", err.Err)
log.Printf("Raw payload: %s", err.Raw)
log.Printf("Message was nack'd: %t", err.Nack)
}
The Raw field contains the original message bytes for debugging.
Common causes:
- Codec mismatch (JSON vs Protobuf)
- Schema incompatibility
- Corrupted message
- Empty payload
Ack/Nack Errors
Occur when acknowledging or rejecting a message fails:
if err.Operation == "ack" {
log.Printf("Failed to acknowledge message: %v", err.Err)
}
if err.Operation == "nack" {
log.Printf("Failed to nack message: %v", err.Err)
}
Common causes:
- Connection lost after processing
- Message already ack'd/expired
- Broker timeout
Error Handling Patterns
Centralized Logging
capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
err, _ := herald.ErrorKey.From(e)
slog.Error("herald error",
"operation", err.Operation,
"signal", err.Signal,
"error", err.Err,
)
})
Metrics Collection
capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
err, _ := herald.ErrorKey.From(e)
metrics.Counter("herald_errors_total",
"operation", err.Operation,
"signal", err.Signal,
).Inc()
})
Alerting
capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
err, _ := herald.ErrorKey.From(e)
if err.Operation == "publish" {
// Critical: messages not being delivered
alertOps(fmt.Sprintf("Publishing failed: %s - %v", err.Signal, err.Err))
}
})
Dead Letter Handling
capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
err, _ := herald.ErrorKey.From(e)
if err.Operation == "unmarshal" && err.Raw != nil {
// Store failed messages for later analysis
deadLetterQueue.Send(ctx, err.Raw, map[string]string{
"signal": err.Signal,
"error": err.Err,
"timestamp": time.Now().Format(time.RFC3339),
})
}
})
Sentinel Errors
Herald provides sentinel errors for common cases:
var (
ErrNoWriter = errors.New("herald: no writer configured")
ErrNoReader = errors.New("herald: no reader configured")
)
Check for specific errors:
if errors.Is(err.Err, herald.ErrNoWriter) {
log.Printf("Provider not configured for publishing")
}
Error Recovery
Automatic Recovery
Pipeline options handle transient errors:
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithRetry[Order](3),
herald.WithBackoff[Order](3, 100*time.Millisecond),
})
// Retries automatically before emitting error signal
Manual Recovery
For persistent errors, implement custom recovery:
capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
err, _ := herald.ErrorKey.From(e)
if err.Operation == "publish" {
// Fallback to backup broker
backupProvider.Publish(ctx, originalData, metadata)
}
})
Testing Error Handling
func TestErrorHandling(t *testing.T) {
var capturedError herald.Error
capitan.Hook(herald.ErrorSignal, func(_ context.Context, e *capitan.Event) {
capturedError, _ = herald.ErrorKey.From(e)
})
// Trigger error condition
provider := &failingProvider{err: errors.New("connection refused")}
pub := herald.NewPublisher(provider, signal, key, nil)
pub.Start()
capitan.Emit(ctx, signal, key.Field(order))
capitan.Shutdown()
if capturedError.Operation != "publish" {
t.Errorf("expected publish error, got %s", capturedError.Operation)
}
}