Architecture
Understanding herald's internals helps you reason about data flow, error handling, and extension points.
Overview
┌─────────────────────────────────────────────────────────────────────────┐
│ Herald │
│ │
│ ┌──────────────────────────────┐ ┌──────────────────────────────┐ │
│ │ Publisher │ │ Subscriber │ │
│ │ │ │ │ │
│ │ ┌────────┐ ┌──────────┐ │ │ ┌──────────┐ ┌────────┐ │ │
│ │ │Capitan │───▶│ Pipeline │ │ │ │ Pipeline │───▶│Capitan │ │ │
│ │ │Listener│ │ │ │ │ │ │ │ Emit │ │ │
│ │ └────────┘ └────┬─────┘ │ │ └────┬─────┘ └────────┘ │ │
│ │ │ │ │ │ │ │
│ │ ┌──────▼─────┐ │ │ ┌────▼───────┐ │ │
│ │ │ Terminal │ │ │ │ Terminal │ │ │
│ │ │ (publish) │ │ │ │ (emit) │ │ │
│ │ └──────┬─────┘ │ │ └────┬───────┘ │ │
│ └─────────────────────┼────────┘ └───────┼──────────────────────┘ │
│ │ │ │
│ ▼ ▲ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Provider │ │
│ │ │ │
│ │ Publish(ctx, data, metadata) Subscribe(ctx) <-chan Result │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────┐
│ Message Broker │
│ (Kafka/NATS/Redis/ │
│ SQS/Pub/Sub/...) │
└───────────────────────┘
Publishing Flow
When a capitan event is emitted, the publisher processes it through a pipeline:
capitan.Emit(ctx, signal, key.Field(value))
│
▼
┌───────────────────┐
│ Capitan Worker │ (async dispatch)
└────────┬──────────┘
│
▼
┌───────────────────┐
│ Publisher Listener│ (registered via pub.Start())
└────────┬──────────┘
│
▼
┌───────────────────┐
│ Extract Value │ key.From(event) → typed value
└────────┬──────────┘
│
▼
┌───────────────────┐
│ Wrap in Envelope │ Envelope{Value: T, Metadata: {}}
└────────┬──────────┘
│
▼
┌───────────────────────────────────────────────────┐
│ Pipeline │
│ │
│ ┌─────────────┐ │
│ │ Rate Limit │ ─── ops/sec + burst control │
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ Circuit │ ─── fail-fast on repeated errors │
│ │ Breaker │ │
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ Timeout │ ─── per-operation deadline │
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ Backoff │ ─── exponential retry delays │
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ Retry │ ─── immediate retries │
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ Middleware │ ─── transform/effect/apply │
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ Terminal │ ─── serialize + provider.Publish │
│ └─────────────┘ │
└───────────────────────────────────────────────────┘
│
▼
┌───────────────────┐
│ provider.Publish │ (broker-specific)
└───────────────────┘
Terminal (Publish)
The publish terminal:
- Marshals the envelope value using the codec
- Calls
provider.Publish(ctx, data, envelope.Metadata) - Returns error or nil
func (p *Publisher[T]) terminal(ctx context.Context, env *Envelope[T]) error {
data, err := p.codec.Marshal(env.Value)
if err != nil {
return err
}
return p.provider.Publish(ctx, data, env.Metadata)
}
Subscribing Flow
The subscriber consumes from the provider and emits to capitan:
┌───────────────────┐
│ provider.Subscribe│ (returns <-chan Result[Message])
└────────┬──────────┘
│
▼
┌───────────────────┐
│ Subscriber Loop │ (goroutine from sub.Start)
└────────┬──────────┘
│
▼
┌───────────────────┐
│ Result Check │ ─── error? emit to ErrorSignal, continue
└────────┬──────────┘
│
▼
┌───────────────────┐
│ Unmarshal │ codec.Unmarshal(msg.Data, &value)
└────────┬──────────┘
│ error? → Nack + emit to ErrorSignal
▼
┌───────────────────┐
│ Wrap in Envelope │ Envelope{Value: T, Metadata: msg.Metadata}
└────────┬──────────┘
│
▼
┌───────────────────────────────────────────────────┐
│ Pipeline │
│ (same structure as publishing) │
│ │
│ Rate Limit → Circuit Breaker → Timeout → │
│ Backoff → Retry → Middleware → Terminal │
└────────┬──────────────────────────────────────────┘
│
▼
┌───────────────────┐
│ Terminal │ capitan.Emit(ctx, signal, fields...)
└────────┬──────────┘
│ success? → Ack
│ error? → Nack
▼
┌───────────────────┐
│ Capitan Event │ (handlers receive via Hook)
└───────────────────┘
Terminal (Subscribe)
The subscribe terminal:
- Emits the typed value and metadata as capitan fields
- Returns error or nil (determines ack/nack)
func (s *Subscriber[T]) terminal(ctx context.Context, env *Envelope[T]) error {
s.cap.Emit(ctx, s.signal,
s.key.Field(env.Value),
MetadataKey.Field(env.Metadata),
)
return nil
}
Pipeline Construction
Herald uses pipz for pipeline construction. Each option wraps the terminal:
// Options applied in order
opts := []herald.Option[Order]{
herald.WithRetry[Order](3), // Wraps terminal
herald.WithTimeout[Order](5*s), // Wraps retry
herald.WithRateLimit[Order](100, 10), // Wraps timeout
}
The final pipeline structure (innermost to outermost):
Terminal
└── Retry
└── Timeout
└── RateLimit
Execution flows outside-in: RateLimit → Timeout → Retry → Terminal.
Option Implementation
Each option returns a function that wraps the pipeline:
func WithRetry[T any](attempts int) Option[T] {
return func(p pipz.Pipeline[*Envelope[T]]) pipz.Pipeline[*Envelope[T]] {
return pipz.Retry(p, attempts)
}
}
Provider Abstraction
Providers abstract broker differences behind a minimal interface:
type Provider interface {
Publish(ctx context.Context, data []byte, metadata Metadata) error
Subscribe(ctx context.Context) <-chan Result[Message]
Ping(ctx context.Context) error
Close() error
}
Metadata Mapping
Each provider maps Metadata (a map[string]string) to broker-native headers:
| Provider | Publish | Subscribe |
|---|---|---|
| Kafka | kafka.Header | kafka.Header |
| JetStream | nats.Header | nats.Header |
| Pub/Sub | pubsub.Attributes | pubsub.Attributes |
| SQS | MessageAttributes | MessageAttributes |
| AMQP | amqp.Table (Headers) | amqp.Table |
Acknowledgment Mapping
Each provider implements broker-appropriate ack/nack:
type Message struct {
Data []byte
Metadata Metadata
Ack func() error // Success acknowledgment
Nack func() error // Failure acknowledgment
}
Example (Kafka):
Ack: func() error {
return reader.CommitMessages(ctx, kafkaMsg)
}
Nack: func() error {
// Don't commit - message redelivered on next fetch
return nil
}
Error Flow
All errors emit to herald.ErrorSignal:
┌───────────────────┐
│ Error Occurs │ (marshal, publish, unmarshal, ack, nack)
└────────┬──────────┘
│
▼
┌───────────────────┐
│ Create Error │ herald.Error{Operation, Signal, Err, Raw}
└────────┬──────────┘
│
▼
┌───────────────────┐
│ Emit to Capitan │ capitan.Emit(ctx, ErrorSignal, ErrorKey.Field(err))
└────────┬──────────┘
│
▼
┌───────────────────┐
│ Error Handlers │ (registered via capitan.Hook)
└───────────────────┘
Error operations:
publish: Provider publish failedunmarshal: Codec unmarshal failedack: Message acknowledgment failednack: Message nack failedsubscribe: Provider subscribe returned error
Thread Safety
Herald components are thread-safe:
| Component | Safety Model |
|---|---|
| Publisher | Single capitan listener, no shared state |
| Subscriber | Single goroutine consumer |
| Provider | Implementation-dependent (typically thread-safe) |
| Pipeline | Stateless transformations |
Publishers can run concurrently for different signals. Multiple subscribers can consume from different providers. The underlying capitan instance handles listener registration thread-safely.
Graceful Shutdown
Proper shutdown order prevents message loss:
Publisher Shutdown
capitan.Shutdown() // 1. Drain pending events (ensures all reach publisher)
pub.Close() // 2. Unregister listener
provider.Close() // 3. Close broker connection
Subscriber Shutdown
cancel() // 1. Signal consumer to stop
sub.Close() // 2. Wait for in-flight message to complete
capitan.Shutdown() // 3. Drain any queued capitan events
provider.Close() // 4. Close broker connection
The subscriber tracks in-flight messages. Close() blocks until the current message completes its pipeline (including ack/nack).
Design Decisions
Why Pipelines?
Pipelines provide composable reliability without coupling concerns:
- Retry logic doesn't know about rate limiting
- Circuit breakers don't know about timeouts
- Each concern is testable in isolation
Why Envelopes?
Envelopes carry metadata alongside typed values:
- Middleware can inject headers without knowing the value type
- Metadata flows end-to-end without type assertions
- Clean separation between payload and transport concerns
Why Error Signals?
Emitting errors to capitan enables:
- Centralized error handling across all publishers/subscribers
- Same observability patterns as application events
- Integration with existing capitan infrastructure (logging, metrics)
Why Separate Providers?
Each broker has distinct semantics:
- Kafka has partitions and consumer groups
- NATS has subjects and queue groups
- SQS has visibility timeouts and dead letter queues
A single abstraction would either lose features or become unwieldy. Providers encapsulate broker-specific configuration while exposing a uniform runtime interface.
Next Steps
- Publishing Guide — Detailed publishing patterns
- Subscribing Guide — Detailed subscribing patterns
- Providers Reference — Provider configuration details