zoobzio January 2, 2026 Edit this page

Core Concepts

Herald has five primitives: publishers, subscribers, providers, envelopes, and pipelines. Understanding these unlocks the full API.

Publishers

A publisher observes a capitan signal and forwards events to a message broker.

pub := herald.NewPublisher(provider, orderCreated, orderKey, nil)
pub.Start()

When Start() is called, the publisher registers as a capitan listener. Every event emitted to that signal is serialized and published to the broker.

// This event automatically flows to Kafka/Redis/NATS/etc.
capitan.Emit(ctx, orderCreated, orderKey.Field(order))

Publisher Lifecycle

pub := herald.NewPublisher(provider, signal, key, opts)
pub.Start()   // Register listener, begin forwarding
// ... emit events ...
pub.Close()   // Unregister listener, cleanup

Always call Close() to release resources.

Subscribers

A subscriber consumes messages from a broker and emits them as capitan events.

sub := herald.NewSubscriber(provider, orderCreated, orderKey, nil)
sub.Start(ctx)

When Start(ctx) is called, the subscriber spawns a goroutine that reads from the broker. Each message is deserialized and emitted as a capitan event.

// Messages from the broker arrive here
capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
    order, _ := orderKey.From(e)
    // process order
})

Subscriber Lifecycle

ctx, cancel := context.WithCancel(context.Background())
sub := herald.NewSubscriber(provider, signal, key, opts)
sub.Start(ctx)  // Spawn consumer goroutine
// ... process messages ...
cancel()        // Signal stop
sub.Close()     // Wait for cleanup

Context cancellation stops message consumption gracefully.

Providers

A provider implements broker-specific communication. Herald includes 11 providers:

ProviderPackageUse Case
KafkakafkaHigh-throughput streaming
NATSnatsLightweight cloud messaging
JetStreamjetstreamNATS with persistence
Google Pub/SubpubsubGCP managed messaging
Redis StreamsredisIn-memory with persistence
AWS SQSsqsAWS managed queues
RabbitMQ/AMQPamqpTraditional message broker
AWS SNSsnsPub/sub fanout
BoltDBboltEmbedded local queues
FirestorefirestoreFirebase/GCP document store
ioioTesting with io.Reader/Writer

Provider Interface

All providers implement:

type Provider interface {
    Publish(ctx context.Context, data []byte, metadata Metadata) error
    Subscribe(ctx context.Context) <-chan Result[Message]
    Ping(ctx context.Context) error
    Close() error
}

Publishers call Publish. Subscribers read from Subscribe. The interface abstracts broker differences.

Message Structure

type Message struct {
    Data     []byte
    Metadata Metadata
    Ack      func() error
    Nack     func() error
}
  • Data: Raw message bytes
  • Metadata: Headers/attributes (maps to broker-native headers)
  • Ack: Acknowledge successful processing
  • Nack: Signal processing failure (triggers redelivery)

Envelopes

An envelope wraps a typed value with metadata for pipeline processing:

type Envelope[T any] struct {
    Value    T
    Metadata Metadata
}

Middleware operates on envelopes, allowing transformation and header injection before the terminal operation (publish/emit).

herald.UseTransform[Order]("add-trace", func(_ context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
    env.Metadata["trace-id"] = generateTraceID()
    return env
})

Pipelines

Pipelines compose middleware around a terminal operation. Herald uses pipz for pipeline construction.

Pipeline Options

Add reliability features:

opts := []herald.Option[Order]{
    herald.WithRetry[Order](3),                          // Retry up to 3 times
    herald.WithBackoff[Order](3, 100*time.Millisecond),  // Exponential backoff
    herald.WithTimeout[Order](5*time.Second),            // Per-operation timeout
    herald.WithCircuitBreaker[Order](5, 30*time.Second), // Fail-fast on errors
    herald.WithRateLimit[Order](100, 10),                // 100 ops/sec, burst 10
}

pub := herald.NewPublisher(provider, signal, key, opts)

Options wrap the terminal in order. The final pipeline structure:

Rate Limit → Circuit Breaker → Timeout → Backoff → Retry → Terminal (publish/emit)

Middleware

Add custom processing with middleware:

herald.WithMiddleware(
    herald.UseApply[Order]("validate", func(ctx context.Context, env *herald.Envelope[Order]) (*herald.Envelope[Order], error) {
        if env.Value.Total < 0 {
            return nil, errors.New("invalid total")
        }
        return env, nil
    }),
    herald.UseEffect[Order]("log", func(ctx context.Context, env *herald.Envelope[Order]) error {
        log.Printf("Processing order %s", env.Value.ID)
        return nil
    }),
    herald.UseTransform[Order]("enrich", func(ctx context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
        env.Metadata["processed-at"] = time.Now().Format(time.RFC3339)
        return env
    }),
)
ProcessorPurpose
UseApplyTransform with possible error
UseEffectSide effect, envelope unchanged
UseTransformPure transform, cannot fail

Codecs

Codecs handle serialization between typed values and bytes:

type Codec interface {
    Marshal(v any) ([]byte, error)
    Unmarshal(data []byte, v any) error
    ContentType() string
}

Herald defaults to JSON. Use custom codecs for other formats:

pub := herald.NewPublisher(provider, signal, key, opts,
    herald.WithPublisherCodec[Order](myProtobufCodec))

sub := herald.NewSubscriber(provider, signal, key, opts,
    herald.WithSubscriberCodec[Order](myProtobufCodec))

Acknowledgment

Herald handles acknowledgment automatically:

OutcomeAction
Successful processingAck() called
Deserialization failureNack() called
Pipeline failureNack() called

Each provider maps ack/nack to broker-appropriate behavior:

ProviderAckNack
KafkaCommit offsetDon't commit
JetStreammsg.Ack()msg.Nak()
Pub/Submsg.Ack()msg.Nack()
SQSDelete messageVisibility timeout
AMQPAck(false)Nack(false, true)

Error Handling

All errors flow through capitan's event system:

capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)
    log.Printf("[herald] %s: %v", err.Operation, err.Err)
})

The error struct includes context:

type Error struct {
    Operation string  // "publish", "subscribe", "unmarshal", "ack", "nack"
    Signal    string  // Signal name
    Err       error   // Underlying error
    Raw       []byte  // Raw payload (for unmarshal errors)
}

Metadata

Metadata flows through the entire pipeline:

Publishing: Middleware can add headers → Provider maps to broker headers

herald.UseTransform[Order]("headers", func(_ context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
    env.Metadata["correlation-id"] = correlationID
    return env
})

Subscribing: Broker headers → Metadata → Available in capitan event

capitan.Hook(signal, func(ctx context.Context, e *capitan.Event) {
    meta, _ := herald.MetadataKey.From(e)
    correlationID := meta["correlation-id"]
})

Next Steps