zoobzio December 10, 2025 Edit this page

Reliability

Herald provides composable pipeline options for building resilient message bridges. These options wrap the publish/subscribe operations with retry logic, timeouts, circuit breakers, and rate limiting.

Middleware

Herald exposes pipz primitives for adding processing steps via WithMiddleware. Middleware operates on *Envelope[T], providing access to both the value and metadata.

UseApply

Transform data with possible failure:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithMiddleware(
        herald.UseApply[Order]("validate", func(ctx context.Context, env *herald.Envelope[Order]) (*herald.Envelope[Order], error) {
            if env.Value.Total < 0 {
                return env, errors.New("invalid total")
            }
            return env, nil
        }),
    ),
})

UseEffect

Side effects without modifying data:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithMiddleware(
        herald.UseEffect[Order]("log", func(ctx context.Context, env *herald.Envelope[Order]) error {
            log.Printf("processing order %s", env.Value.ID)
            return nil
        }),
    ),
})

UseTransform

Pure transformation that cannot fail:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithMiddleware(
        herald.UseTransform[Order]("timestamp", func(ctx context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
            env.Value.ProcessedAt = time.Now()
            return env
        }),
    ),
})

Error Behavior

Return an error to abort processing:

  • Publisher: Error prevents publish, emits to ErrorSignal
  • Subscriber: Error triggers Nack(), emits to ErrorSignal

Execution Order

Middleware executes in order, then the terminal:

opts := []herald.Option[Order]{
    herald.WithMiddleware(
        herald.UseEffect[Order]("first", firstFn),   // runs first
        herald.UseEffect[Order]("second", secondFn), // runs second
    ),
}

Execution order: first → second → terminal

Pipeline Options

Retry

Retry failed operations immediately:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithRetry[Order](3), // Retry up to 3 times
})

Use for transient failures like network blips.

Backoff

Retry with exponential backoff:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithBackoff[Order](3, 100*time.Millisecond),
    // Delays: 100ms, 200ms, 400ms
})

Use when the downstream system needs recovery time.

Timeout

Cancel operations that take too long:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithTimeout[Order](5 * time.Second),
})

Prevents hanging on slow brokers.

Circuit Breaker

Stop attempting operations after repeated failures:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithCircuitBreaker[Order](5, 30*time.Second),
    // Opens after 5 failures, recovers after 30s
})

Prevents cascade failures when a broker is down.

Rate Limit

Limit operations per second:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithRateLimit[Order](100, 10),
    // 100 ops/sec with burst of 10
})

Protects downstream systems from overload.

Error Handler

Custom error handling via pipz chainable:

import "github.com/zoobz-io/pipz"

errorLogger := pipz.Effect("log-error", func(ctx context.Context, err *pipz.Error[*herald.Envelope[Order]]) error {
    log.Printf("publish failed: %v (order: %s)", err.Err, err.InputData.Value.ID)
    metrics.IncrementPublishError()
    return nil
})

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithErrorHandler[Order](errorLogger),
})

The error handler observes errors without modifying the error flow — the original error still propagates after the handler processes it.

Custom Pipeline

Full control over the pipeline:

import "github.com/zoobz-io/pipz"

customPipeline := pipz.NewSequence("custom",
    pipz.NewRetry[*herald.Envelope[Order]]("retry", nil, 3),
    pipz.NewTimeout[*herald.Envelope[Order]]("timeout", nil, 5*time.Second),
)

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithPipeline[Order](customPipeline),
})

Combining Options

Options are applied in order, wrapping inside-out:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithRateLimit[Order](100, 10),    // Outermost: rate limit first
    herald.WithCircuitBreaker[Order](5, 30*time.Second),
    herald.WithBackoff[Order](3, 100*time.Millisecond),
    herald.WithTimeout[Order](5*time.Second), // Innermost: timeout per attempt
})

Execution order:

  1. Rate limit (wait if needed)
  2. Check circuit breaker (fail fast if open)
  3. Attempt with backoff (retry on failure)
  4. Each attempt has 5s timeout

Timing Example

With the configuration above, a failing publish follows this timeline:

t=0ms      Rate limit check (pass)
t=0ms      Circuit breaker check (closed, pass)
t=0ms      Attempt 1 starts
t=5000ms   Attempt 1 times out
t=5000ms   Backoff delay: 100ms
t=5100ms   Attempt 2 starts
t=10100ms  Attempt 2 times out
t=10100ms  Backoff delay: 200ms (exponential)
t=10300ms  Attempt 3 starts
t=15300ms  Attempt 3 times out
t=15300ms  All retries exhausted, error emitted

Total worst-case duration: ~15.3 seconds (3 attempts × 5s timeout + backoff delays)

Backoff vs Retry

WithRetry retries immediately; WithBackoff retries with exponential delays:

// Immediate retry - good for transient network blips
herald.WithRetry[Order](3)
// Timeline: attempt1 → fail → attempt2 → fail → attempt3

// Backoff retry - good when downstream needs recovery time
herald.WithBackoff[Order](3, 100*time.Millisecond)
// Timeline: attempt1 → fail → 100ms → attempt2 → fail → 200ms → attempt3

Use backoff when the failure is likely due to load or temporary unavailability. Use immediate retry for random packet loss or brief disconnections.

Rate Limit + Circuit Breaker

When combined, rate limiting applies before the circuit breaker check:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithRateLimit[Order](100, 10),
    herald.WithCircuitBreaker[Order](5, 30*time.Second),
})

Behavior:

  • Rate limit controls throughput (100 ops/sec, burst of 10)
  • Circuit breaker tracks failures from operations that pass the rate limit
  • If the circuit opens, requests fail fast without consuming rate limit tokens
  • When the circuit recovers, rate limiting resumes normally

This combination protects both directions: rate limiting prevents overwhelming a healthy downstream, while circuit breaker stops hammering a failing one.

High-Availability Publishing

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithBackoff[Order](5, 200*time.Millisecond),
    herald.WithTimeout[Order](10*time.Second),
    herald.WithCircuitBreaker[Order](10, time.Minute),
})

Rate-Limited Subscription

sub := herald.NewSubscriber(provider, signal, key, []herald.Option[Order]{
    herald.WithRateLimit[Order](50, 5),
    herald.WithTimeout[Order](30*time.Second),
})

Fail-Fast for Critical Paths

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithTimeout[Order](time.Second),
    // No retry - fail immediately for latency-sensitive operations
})

Error Propagation

When all retries are exhausted or the circuit is open, errors propagate to capitan:

capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)

    switch {
    case errors.Is(err.Err, pipz.ErrCircuitOpen):
        alertOps("Circuit breaker open for " + err.Signal)
    case errors.Is(err.Err, context.DeadlineExceeded):
        log.Printf("Timeout publishing to %s", err.Signal)
    default:
        log.Printf("Publish failed: %v", err.Err)
    }
})

Performance Considerations

Pipeline options add minimal overhead:

  • Retry/Backoff: Only active on failures
  • Timeout: Single context wrapper
  • Circuit Breaker: Atomic counter check
  • Rate Limit: Token bucket algorithm

For maximum throughput, use options sparingly:

// High-throughput path: minimal options
pub := herald.NewPublisher(provider, signal, key, nil)

// Critical path: full protection
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithRetry[Order](3),
    herald.WithTimeout[Order](5*time.Second),
})

Integration with pipz

Herald's options are powered by pipz. For advanced use cases, use pipz directly:

import "github.com/zoobz-io/pipz"

// Build a custom pipeline with pipz primitives
fallback := pipz.Effect[*herald.Envelope[Order]]("fallback", func(ctx context.Context, env *herald.Envelope[Order]) error {
    return fallbackPublish(ctx, env)
})

pipeline := pipz.NewSequence("custom",
    pipz.NewRetry[*herald.Envelope[Order]]("retry", nil, 3),
    pipz.NewFallback[*herald.Envelope[Order]]("fallback", nil, fallback),
)

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithPipeline[Order](pipeline),
})