zoobzio January 2, 2026 Edit this page

Architecture

Understanding herald's internals helps you reason about data flow, error handling, and extension points.

Overview

┌─────────────────────────────────────────────────────────────────────────┐
│                              Herald                                      │
│                                                                          │
│  ┌──────────────────────────────┐    ┌──────────────────────────────┐   │
│  │         Publisher            │    │         Subscriber           │   │
│  │                              │    │                              │   │
│  │  ┌────────┐    ┌──────────┐  │    │  ┌──────────┐    ┌────────┐  │   │
│  │  │Capitan │───▶│ Pipeline │  │    │  │ Pipeline │───▶│Capitan │  │   │
│  │  │Listener│    │          │  │    │  │          │    │  Emit  │  │   │
│  │  └────────┘    └────┬─────┘  │    │  └────┬─────┘    └────────┘  │   │
│  │                     │        │    │       │                      │   │
│  │              ┌──────▼─────┐  │    │  ┌────▼───────┐              │   │
│  │              │  Terminal  │  │    │  │  Terminal  │              │   │
│  │              │ (publish)  │  │    │  │  (emit)    │              │   │
│  │              └──────┬─────┘  │    │  └────┬───────┘              │   │
│  └─────────────────────┼────────┘    └───────┼──────────────────────┘   │
│                        │                     │                          │
│                        ▼                     ▲                          │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │                          Provider                                 │   │
│  │                                                                   │   │
│  │   Publish(ctx, data, metadata)      Subscribe(ctx) <-chan Result │   │
│  │                                                                   │   │
│  └──────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
                        ┌───────────────────────┐
                        │    Message Broker     │
                        │  (Kafka/NATS/Redis/   │
                        │   SQS/Pub/Sub/...)    │
                        └───────────────────────┘

Publishing Flow

When a capitan event is emitted, the publisher processes it through a pipeline:

capitan.Emit(ctx, signal, key.Field(value))
        │
        ▼
┌───────────────────┐
│  Capitan Worker   │  (async dispatch)
└────────┬──────────┘
         │
         ▼
┌───────────────────┐
│ Publisher Listener│  (registered via pub.Start())
└────────┬──────────┘
         │
         ▼
┌───────────────────┐
│  Extract Value    │  key.From(event) → typed value
└────────┬──────────┘
         │
         ▼
┌───────────────────┐
│  Wrap in Envelope │  Envelope{Value: T, Metadata: {}}
└────────┬──────────┘
         │
         ▼
┌───────────────────────────────────────────────────┐
│                    Pipeline                        │
│                                                    │
│  ┌─────────────┐                                   │
│  │ Rate Limit  │ ─── ops/sec + burst control       │
│  └──────┬──────┘                                   │
│         │                                          │
│  ┌──────▼──────┐                                   │
│  │   Circuit   │ ─── fail-fast on repeated errors  │
│  │   Breaker   │                                   │
│  └──────┬──────┘                                   │
│         │                                          │
│  ┌──────▼──────┐                                   │
│  │   Timeout   │ ─── per-operation deadline        │
│  └──────┬──────┘                                   │
│         │                                          │
│  ┌──────▼──────┐                                   │
│  │   Backoff   │ ─── exponential retry delays      │
│  └──────┬──────┘                                   │
│         │                                          │
│  ┌──────▼──────┐                                   │
│  │    Retry    │ ─── immediate retries             │
│  └──────┬──────┘                                   │
│         │                                          │
│  ┌──────▼──────┐                                   │
│  │ Middleware  │ ─── transform/effect/apply        │
│  └──────┬──────┘                                   │
│         │                                          │
│  ┌──────▼──────┐                                   │
│  │  Terminal   │ ─── serialize + provider.Publish  │
│  └─────────────┘                                   │
└───────────────────────────────────────────────────┘
         │
         ▼
┌───────────────────┐
│ provider.Publish  │  (broker-specific)
└───────────────────┘

Terminal (Publish)

The publish terminal:

  1. Marshals the envelope value using the codec
  2. Calls provider.Publish(ctx, data, envelope.Metadata)
  3. Returns error or nil
func (p *Publisher[T]) terminal(ctx context.Context, env *Envelope[T]) error {
    data, err := p.codec.Marshal(env.Value)
    if err != nil {
        return err
    }
    return p.provider.Publish(ctx, data, env.Metadata)
}

Subscribing Flow

The subscriber consumes from the provider and emits to capitan:

┌───────────────────┐
│ provider.Subscribe│  (returns <-chan Result[Message])
└────────┬──────────┘
         │
         ▼
┌───────────────────┐
│  Subscriber Loop  │  (goroutine from sub.Start)
└────────┬──────────┘
         │
         ▼
┌───────────────────┐
│  Result Check     │  ─── error? emit to ErrorSignal, continue
└────────┬──────────┘
         │
         ▼
┌───────────────────┐
│   Unmarshal       │  codec.Unmarshal(msg.Data, &value)
└────────┬──────────┘
         │  error? → Nack + emit to ErrorSignal
         ▼
┌───────────────────┐
│  Wrap in Envelope │  Envelope{Value: T, Metadata: msg.Metadata}
└────────┬──────────┘
         │
         ▼
┌───────────────────────────────────────────────────┐
│                    Pipeline                        │
│         (same structure as publishing)             │
│                                                    │
│  Rate Limit → Circuit Breaker → Timeout →          │
│  Backoff → Retry → Middleware → Terminal           │
└────────┬──────────────────────────────────────────┘
         │
         ▼
┌───────────────────┐
│    Terminal       │  capitan.Emit(ctx, signal, fields...)
└────────┬──────────┘
         │  success? → Ack
         │  error?   → Nack
         ▼
┌───────────────────┐
│  Capitan Event    │  (handlers receive via Hook)
└───────────────────┘

Terminal (Subscribe)

The subscribe terminal:

  1. Emits the typed value and metadata as capitan fields
  2. Returns error or nil (determines ack/nack)
func (s *Subscriber[T]) terminal(ctx context.Context, env *Envelope[T]) error {
    s.cap.Emit(ctx, s.signal,
        s.key.Field(env.Value),
        MetadataKey.Field(env.Metadata),
    )
    return nil
}

Pipeline Construction

Herald uses pipz for pipeline construction. Each option wraps the terminal:

// Options applied in order
opts := []herald.Option[Order]{
    herald.WithRetry[Order](3),      // Wraps terminal
    herald.WithTimeout[Order](5*s),  // Wraps retry
    herald.WithRateLimit[Order](100, 10), // Wraps timeout
}

The final pipeline structure (innermost to outermost):

Terminal
  └── Retry
       └── Timeout
            └── RateLimit

Execution flows outside-in: RateLimit → Timeout → Retry → Terminal.

Option Implementation

Each option returns a function that wraps the pipeline:

func WithRetry[T any](attempts int) Option[T] {
    return func(p pipz.Pipeline[*Envelope[T]]) pipz.Pipeline[*Envelope[T]] {
        return pipz.Retry(p, attempts)
    }
}

Provider Abstraction

Providers abstract broker differences behind a minimal interface:

type Provider interface {
    Publish(ctx context.Context, data []byte, metadata Metadata) error
    Subscribe(ctx context.Context) <-chan Result[Message]
    Ping(ctx context.Context) error
    Close() error
}

Metadata Mapping

Each provider maps Metadata (a map[string]string) to broker-native headers:

ProviderPublishSubscribe
Kafkakafka.Headerkafka.Header
JetStreamnats.Headernats.Header
Pub/Subpubsub.Attributespubsub.Attributes
SQSMessageAttributesMessageAttributes
AMQPamqp.Table (Headers)amqp.Table

Acknowledgment Mapping

Each provider implements broker-appropriate ack/nack:

type Message struct {
    Data     []byte
    Metadata Metadata
    Ack      func() error  // Success acknowledgment
    Nack     func() error  // Failure acknowledgment
}

Example (Kafka):

Ack: func() error {
    return reader.CommitMessages(ctx, kafkaMsg)
}
Nack: func() error {
    // Don't commit - message redelivered on next fetch
    return nil
}

Error Flow

All errors emit to herald.ErrorSignal:

┌───────────────────┐
│   Error Occurs    │  (marshal, publish, unmarshal, ack, nack)
└────────┬──────────┘
         │
         ▼
┌───────────────────┐
│  Create Error     │  herald.Error{Operation, Signal, Err, Raw}
└────────┬──────────┘
         │
         ▼
┌───────────────────┐
│  Emit to Capitan  │  capitan.Emit(ctx, ErrorSignal, ErrorKey.Field(err))
└────────┬──────────┘
         │
         ▼
┌───────────────────┐
│  Error Handlers   │  (registered via capitan.Hook)
└───────────────────┘

Error operations:

  • publish: Provider publish failed
  • unmarshal: Codec unmarshal failed
  • ack: Message acknowledgment failed
  • nack: Message nack failed
  • subscribe: Provider subscribe returned error

Thread Safety

Herald components are thread-safe:

ComponentSafety Model
PublisherSingle capitan listener, no shared state
SubscriberSingle goroutine consumer
ProviderImplementation-dependent (typically thread-safe)
PipelineStateless transformations

Publishers can run concurrently for different signals. Multiple subscribers can consume from different providers. The underlying capitan instance handles listener registration thread-safely.

Graceful Shutdown

Proper shutdown order prevents message loss:

Publisher Shutdown

capitan.Shutdown()  // 1. Drain pending events (ensures all reach publisher)
pub.Close()         // 2. Unregister listener
provider.Close()    // 3. Close broker connection

Subscriber Shutdown

cancel()            // 1. Signal consumer to stop
sub.Close()         // 2. Wait for in-flight message to complete
capitan.Shutdown()  // 3. Drain any queued capitan events
provider.Close()    // 4. Close broker connection

The subscriber tracks in-flight messages. Close() blocks until the current message completes its pipeline (including ack/nack).

Design Decisions

Why Pipelines?

Pipelines provide composable reliability without coupling concerns:

  • Retry logic doesn't know about rate limiting
  • Circuit breakers don't know about timeouts
  • Each concern is testable in isolation

Why Envelopes?

Envelopes carry metadata alongside typed values:

  • Middleware can inject headers without knowing the value type
  • Metadata flows end-to-end without type assertions
  • Clean separation between payload and transport concerns

Why Error Signals?

Emitting errors to capitan enables:

  • Centralized error handling across all publishers/subscribers
  • Same observability patterns as application events
  • Integration with existing capitan infrastructure (logging, metrics)

Why Separate Providers?

Each broker has distinct semantics:

  • Kafka has partitions and consumer groups
  • NATS has subjects and queue groups
  • SQS has visibility timeouts and dead letter queues

A single abstraction would either lose features or become unwieldy. Providers encapsulate broker-specific configuration while exposing a uniform runtime interface.

Next Steps