zoobzio December 10, 2025 Edit this page

Error Handling

Herald routes all operational errors through capitan's event system, enabling centralized error handling and observability.

Error Signal

All herald errors are emitted on herald.ErrorSignal:

capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)
    log.Printf("[herald] %s: %v", err.Operation, err.Err)
})

Error Structure

type Error struct {
    Operation string // "publish", "subscribe", "unmarshal", "ack", "nack"
    Signal    string // The signal name involved
    Err       string // The error message (string for JSON serialization)
    Nack      bool   // Whether the message was nack'd
    Raw       []byte // Raw message data (for unmarshal errors)
}

Note: Err is a string rather than error to support JSON serialization of error events.

Error Types

Publish Errors

Occur when publishing to a broker fails:

capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)
    if err.Operation == "publish" {
        log.Printf("Failed to publish to %s: %v", err.Signal, err.Err)
        // Retry logic, alerting, etc.
    }
})

Common causes:

  • Broker unavailable
  • Network timeout
  • Authentication failure
  • Topic/queue doesn't exist

Subscribe Errors

Occur when consuming from a broker fails:

if err.Operation == "subscribe" {
    log.Printf("Subscription error on %s: %v", err.Signal, err.Err)
}

Common causes:

  • Connection lost
  • Consumer group rebalance
  • Permission denied

Unmarshal Errors

Occur when deserializing a message fails:

if err.Operation == "unmarshal" {
    log.Printf("Deserialization failed: %v", err.Err)
    log.Printf("Raw payload: %s", err.Raw)
    log.Printf("Message was nack'd: %t", err.Nack)
}

The Raw field contains the original message bytes for debugging.

Common causes:

  • Codec mismatch (JSON vs Protobuf)
  • Schema incompatibility
  • Corrupted message
  • Empty payload

Ack/Nack Errors

Occur when acknowledging or rejecting a message fails:

if err.Operation == "ack" {
    log.Printf("Failed to acknowledge message: %v", err.Err)
}

if err.Operation == "nack" {
    log.Printf("Failed to nack message: %v", err.Err)
}

Common causes:

  • Connection lost after processing
  • Message already ack'd/expired
  • Broker timeout

Error Handling Patterns

Centralized Logging

capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)

    slog.Error("herald error",
        "operation", err.Operation,
        "signal", err.Signal,
        "error", err.Err,
    )
})

Metrics Collection

capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)

    metrics.Counter("herald_errors_total",
        "operation", err.Operation,
        "signal", err.Signal,
    ).Inc()
})

Alerting

capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)

    if err.Operation == "publish" {
        // Critical: messages not being delivered
        alertOps(fmt.Sprintf("Publishing failed: %s - %v", err.Signal, err.Err))
    }
})

Dead Letter Handling

capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)

    if err.Operation == "unmarshal" && err.Raw != nil {
        // Store failed messages for later analysis
        deadLetterQueue.Send(ctx, err.Raw, map[string]string{
            "signal":    err.Signal,
            "error":     err.Err,
            "timestamp": time.Now().Format(time.RFC3339),
        })
    }
})

Sentinel Errors

Herald provides sentinel errors for common cases:

var (
    ErrNoWriter = errors.New("herald: no writer configured")
    ErrNoReader = errors.New("herald: no reader configured")
)

Check for specific errors:

if errors.Is(err.Err, herald.ErrNoWriter) {
    log.Printf("Provider not configured for publishing")
}

Error Recovery

Automatic Recovery

Pipeline options handle transient errors:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithRetry[Order](3),
    herald.WithBackoff[Order](3, 100*time.Millisecond),
})
// Retries automatically before emitting error signal

Manual Recovery

For persistent errors, implement custom recovery:

capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)

    if err.Operation == "publish" {
        // Fallback to backup broker
        backupProvider.Publish(ctx, originalData, metadata)
    }
})

Testing Error Handling

func TestErrorHandling(t *testing.T) {
    var capturedError herald.Error

    capitan.Hook(herald.ErrorSignal, func(_ context.Context, e *capitan.Event) {
        capturedError, _ = herald.ErrorKey.From(e)
    })

    // Trigger error condition
    provider := &failingProvider{err: errors.New("connection refused")}
    pub := herald.NewPublisher(provider, signal, key, nil)
    pub.Start()

    capitan.Emit(ctx, signal, key.Field(order))
    capitan.Shutdown()

    if capturedError.Operation != "publish" {
        t.Errorf("expected publish error, got %s", capturedError.Operation)
    }
}