Reliability
Herald provides composable pipeline options for building resilient message bridges. These options wrap the publish/subscribe operations with retry logic, timeouts, circuit breakers, and rate limiting.
Middleware
Herald exposes pipz primitives for adding processing steps via WithMiddleware. Middleware operates on *Envelope[T], providing access to both the value and metadata.
UseApply
Transform data with possible failure:
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithMiddleware(
herald.UseApply[Order]("validate", func(ctx context.Context, env *herald.Envelope[Order]) (*herald.Envelope[Order], error) {
if env.Value.Total < 0 {
return env, errors.New("invalid total")
}
return env, nil
}),
),
})
UseEffect
Side effects without modifying data:
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithMiddleware(
herald.UseEffect[Order]("log", func(ctx context.Context, env *herald.Envelope[Order]) error {
log.Printf("processing order %s", env.Value.ID)
return nil
}),
),
})
UseTransform
Pure transformation that cannot fail:
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithMiddleware(
herald.UseTransform[Order]("timestamp", func(ctx context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
env.Value.ProcessedAt = time.Now()
return env
}),
),
})
Error Behavior
Return an error to abort processing:
- Publisher: Error prevents publish, emits to
ErrorSignal - Subscriber: Error triggers
Nack(), emits toErrorSignal
Execution Order
Middleware executes in order, then the terminal:
opts := []herald.Option[Order]{
herald.WithMiddleware(
herald.UseEffect[Order]("first", firstFn), // runs first
herald.UseEffect[Order]("second", secondFn), // runs second
),
}
Execution order: first → second → terminal
Pipeline Options
Retry
Retry failed operations immediately:
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithRetry[Order](3), // Retry up to 3 times
})
Use for transient failures like network blips.
Backoff
Retry with exponential backoff:
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithBackoff[Order](3, 100*time.Millisecond),
// Delays: 100ms, 200ms, 400ms
})
Use when the downstream system needs recovery time.
Timeout
Cancel operations that take too long:
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithTimeout[Order](5 * time.Second),
})
Prevents hanging on slow brokers.
Circuit Breaker
Stop attempting operations after repeated failures:
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithCircuitBreaker[Order](5, 30*time.Second),
// Opens after 5 failures, recovers after 30s
})
Prevents cascade failures when a broker is down.
Rate Limit
Limit operations per second:
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithRateLimit[Order](100, 10),
// 100 ops/sec with burst of 10
})
Protects downstream systems from overload.
Error Handler
Custom error handling via pipz chainable:
import "github.com/zoobz-io/pipz"
errorLogger := pipz.Effect("log-error", func(ctx context.Context, err *pipz.Error[*herald.Envelope[Order]]) error {
log.Printf("publish failed: %v (order: %s)", err.Err, err.InputData.Value.ID)
metrics.IncrementPublishError()
return nil
})
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithErrorHandler[Order](errorLogger),
})
The error handler observes errors without modifying the error flow — the original error still propagates after the handler processes it.
Custom Pipeline
Full control over the pipeline:
import "github.com/zoobz-io/pipz"
customPipeline := pipz.NewSequence("custom",
pipz.NewRetry[*herald.Envelope[Order]]("retry", nil, 3),
pipz.NewTimeout[*herald.Envelope[Order]]("timeout", nil, 5*time.Second),
)
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithPipeline[Order](customPipeline),
})
Combining Options
Options are applied in order, wrapping inside-out:
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithRateLimit[Order](100, 10), // Outermost: rate limit first
herald.WithCircuitBreaker[Order](5, 30*time.Second),
herald.WithBackoff[Order](3, 100*time.Millisecond),
herald.WithTimeout[Order](5*time.Second), // Innermost: timeout per attempt
})
Execution order:
- Rate limit (wait if needed)
- Check circuit breaker (fail fast if open)
- Attempt with backoff (retry on failure)
- Each attempt has 5s timeout
Timing Example
With the configuration above, a failing publish follows this timeline:
t=0ms Rate limit check (pass)
t=0ms Circuit breaker check (closed, pass)
t=0ms Attempt 1 starts
t=5000ms Attempt 1 times out
t=5000ms Backoff delay: 100ms
t=5100ms Attempt 2 starts
t=10100ms Attempt 2 times out
t=10100ms Backoff delay: 200ms (exponential)
t=10300ms Attempt 3 starts
t=15300ms Attempt 3 times out
t=15300ms All retries exhausted, error emitted
Total worst-case duration: ~15.3 seconds (3 attempts × 5s timeout + backoff delays)
Backoff vs Retry
WithRetry retries immediately; WithBackoff retries with exponential delays:
// Immediate retry - good for transient network blips
herald.WithRetry[Order](3)
// Timeline: attempt1 → fail → attempt2 → fail → attempt3
// Backoff retry - good when downstream needs recovery time
herald.WithBackoff[Order](3, 100*time.Millisecond)
// Timeline: attempt1 → fail → 100ms → attempt2 → fail → 200ms → attempt3
Use backoff when the failure is likely due to load or temporary unavailability. Use immediate retry for random packet loss or brief disconnections.
Rate Limit + Circuit Breaker
When combined, rate limiting applies before the circuit breaker check:
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithRateLimit[Order](100, 10),
herald.WithCircuitBreaker[Order](5, 30*time.Second),
})
Behavior:
- Rate limit controls throughput (100 ops/sec, burst of 10)
- Circuit breaker tracks failures from operations that pass the rate limit
- If the circuit opens, requests fail fast without consuming rate limit tokens
- When the circuit recovers, rate limiting resumes normally
This combination protects both directions: rate limiting prevents overwhelming a healthy downstream, while circuit breaker stops hammering a failing one.
Recommended Patterns
High-Availability Publishing
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithBackoff[Order](5, 200*time.Millisecond),
herald.WithTimeout[Order](10*time.Second),
herald.WithCircuitBreaker[Order](10, time.Minute),
})
Rate-Limited Subscription
sub := herald.NewSubscriber(provider, signal, key, []herald.Option[Order]{
herald.WithRateLimit[Order](50, 5),
herald.WithTimeout[Order](30*time.Second),
})
Fail-Fast for Critical Paths
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithTimeout[Order](time.Second),
// No retry - fail immediately for latency-sensitive operations
})
Error Propagation
When all retries are exhausted or the circuit is open, errors propagate to capitan:
capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
err, _ := herald.ErrorKey.From(e)
switch {
case errors.Is(err.Err, pipz.ErrCircuitOpen):
alertOps("Circuit breaker open for " + err.Signal)
case errors.Is(err.Err, context.DeadlineExceeded):
log.Printf("Timeout publishing to %s", err.Signal)
default:
log.Printf("Publish failed: %v", err.Err)
}
})
Performance Considerations
Pipeline options add minimal overhead:
- Retry/Backoff: Only active on failures
- Timeout: Single context wrapper
- Circuit Breaker: Atomic counter check
- Rate Limit: Token bucket algorithm
For maximum throughput, use options sparingly:
// High-throughput path: minimal options
pub := herald.NewPublisher(provider, signal, key, nil)
// Critical path: full protection
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithRetry[Order](3),
herald.WithTimeout[Order](5*time.Second),
})
Integration with pipz
Herald's options are powered by pipz. For advanced use cases, use pipz directly:
import "github.com/zoobz-io/pipz"
// Build a custom pipeline with pipz primitives
fallback := pipz.Effect[*herald.Envelope[Order]]("fallback", func(ctx context.Context, env *herald.Envelope[Order]) error {
return fallbackPublish(ctx, env)
})
pipeline := pipz.NewSequence("custom",
pipz.NewRetry[*herald.Envelope[Order]]("retry", nil, 3),
pipz.NewFallback[*herald.Envelope[Order]]("fallback", nil, fallback),
)
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithPipeline[Order](pipeline),
})