zoobzio December 10, 2025 Edit this page

API Reference

Complete reference for herald's public API.

Core Types

Provider

type Provider interface {
    Publish(ctx context.Context, data []byte, metadata Metadata) error
    Subscribe(ctx context.Context) <-chan Result[Message]
    Ping(ctx context.Context) error
    Close() error
}

The interface all broker providers implement.

MethodDescription
PublishSend raw bytes with metadata to the broker
SubscribeReturn a stream of messages from the broker
PingVerify broker connectivity for health checks
CloseRelease broker resources

Message

type Message struct {
    Data     []byte
    Metadata Metadata
    Ack      func() error
    Nack     func() error
}

Represents a message received from a broker.

FieldDescription
DataRaw message payload
MetadataMessage headers/attributes
AckAcknowledge successful processing
NackReject for redelivery

Metadata

type Metadata map[string]string

Key-value pairs for message headers.

ResultT

type Result[T any] struct {
    // unexported fields
}

func NewSuccess[T any](value T) Result[T]
func NewError[T any](err error) Result[T]

func (r Result[T]) IsSuccess() bool
func (r Result[T]) IsError() bool
func (r Result[T]) Value() T
func (r Result[T]) Error() error

Discriminated union for either a value or error.

Error

type Error struct {
    Operation string // "publish", "subscribe", "unmarshal", "ack", "nack"
    Signal    string // Signal name
    Err       string // Error message (string for JSON serialization)
    Nack      bool   // Whether message was nack'd
    Raw       []byte // Raw payload (for unmarshal errors)
}

Error information emitted on ErrorSignal. Note that Err is a string (not error) to support JSON serialization.

Publisher

NewPublisher

func NewPublisher[T any](
    provider Provider,
    signal capitan.Signal,
    key capitan.Key[T],
    opts []Option[T],
    pubOpts ...PublisherOption[T],
) *Publisher[T]

Creates a new publisher.

ParameterDescription
providerBroker provider
signalCapitan signal to observe
keyTyped key for extracting values
optsPipeline options (retry, timeout, etc.)
pubOptsPublisher-specific options

Publisher Methods

func (p *Publisher[T]) Start()
func (p *Publisher[T]) Close() error
MethodDescription
StartBegin observing the signal. Must be called exactly once.
CloseStop observing and wait for in-flight publishes

Publisher Options

func WithPublisherCodec[T any](codec Codec) PublisherOption[T]
func WithPublisherCapitan[T any](c *capitan.Capitan) PublisherOption[T]
OptionDescription
WithPublisherCodecUse custom serialization codec
WithPublisherCapitanUse custom capitan instance

Subscriber

NewSubscriber

func NewSubscriber[T any](
    provider Provider,
    signal capitan.Signal,
    key capitan.Key[T],
    opts []Option[T],
    subOpts ...SubscriberOption[T],
) *Subscriber[T]

Creates a new subscriber.

ParameterDescription
providerBroker provider
signalCapitan signal to emit to
keyTyped key for creating fields
optsPipeline options
subOptsSubscriber-specific options

Subscriber Methods

func (s *Subscriber[T]) Start(ctx context.Context)
func (s *Subscriber[T]) Close() error
MethodDescription
StartBegin consuming messages. Must be called exactly once. The context controls subscriber lifetime.
CloseStop consuming and wait for goroutine to exit

Subscriber Options

func WithSubscriberCodec[T any](codec Codec) SubscriberOption[T]
func WithSubscriberCapitan[T any](c *capitan.Capitan) SubscriberOption[T]
OptionDescription
WithSubscriberCodecUse custom deserialization codec
WithSubscriberCapitanUse custom capitan instance

Codec

Codec Interface

type Codec interface {
    Marshal(v any) ([]byte, error)
    Unmarshal(data []byte, v any) error
    ContentType() string
}

JSONCodec

type JSONCodec struct{}

Default codec using encoding/json.

Pipeline Options

type Option[T any] func(pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]

WithRetry

func WithRetry[T any](maxAttempts int) Option[T]

Retry failed operations up to maxAttempts times.

WithBackoff

func WithBackoff[T any](maxAttempts int, baseDelay time.Duration) Option[T]

Retry with exponential backoff. Delays double each attempt.

WithTimeout

func WithTimeout[T any](timeout time.Duration) Option[T]

Cancel operations exceeding the timeout.

WithCircuitBreaker

func WithCircuitBreaker[T any](maxFailures int, recoveryTime time.Duration) Option[T]

Open circuit after maxFailures consecutive failures. Attempt recovery after recoveryTime.

WithRateLimit

func WithRateLimit[T any](rate float64, burst int) Option[T]

Limit operations to rate per second with burst capacity.

WithErrorHandler

func WithErrorHandler[T any](handler pipz.Chainable[*pipz.Error[*Envelope[T]]]) Option[T]

Custom error handling via a pipz chainable. The handler receives rich error context including the input Envelope, timestamp, and path through the pipeline. The handler observes errors without modifying the error flow.

// Example: Log errors with context
errorLogger := pipz.Effect("log-error", func(ctx context.Context, err *pipz.Error[*herald.Envelope[Order]]) error {
    log.Printf("failed at %v: %v (input: %+v)", err.Path, err.Err, err.InputData.Value)
    return nil
})

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithErrorHandler[Order](errorLogger),
})

WithPipeline

func WithPipeline[T any](custom pipz.Chainable[*Envelope[T]]) Option[T]

Use a fully custom pipeline. Replaces any default processing.

Middleware

Middleware processors are used inside WithMiddleware to build processing pipelines. All processors operate on *Envelope[T], providing access to both Value and Metadata.

WithFilter

func WithFilter[T any](name string, condition func(context.Context, *Envelope[T]) bool) Option[T]

Wrap the pipeline with a condition. If the condition returns false, the pipeline is skipped.

WithMiddleware

func WithMiddleware[T any](processors ...pipz.Chainable[*Envelope[T]]) Option[T]

Wrap the pipeline with a sequence of processors. Processors execute in order, with the wrapped pipeline last.

herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithMiddleware(
        herald.UseEffect[Order]("log", logFn),
        herald.UseApply[Order]("validate", validateFn),
    ),
})

UseTransform

func UseTransform[T any](name string, fn func(context.Context, *Envelope[T]) *Envelope[T]) pipz.Chainable[*Envelope[T]]

Pure transformation. Cannot fail. Use for transformations that always succeed.

UseApply

func UseApply[T any](name string, fn func(context.Context, *Envelope[T]) (*Envelope[T], error)) pipz.Chainable[*Envelope[T]]

Transform with possible error. Use for validation, enrichment, or fallible transformations.

UseEffect

func UseEffect[T any](name string, fn func(context.Context, *Envelope[T]) error) pipz.Chainable[*Envelope[T]]

Side effect only. The envelope passes through unchanged. Use for logging, metrics, or notifications.

UseMutate

func UseMutate[T any](name string, transformer func(context.Context, *Envelope[T]) *Envelope[T], condition func(context.Context, *Envelope[T]) bool) pipz.Chainable[*Envelope[T]]

Conditional transformation. The transformer is only applied if the condition returns true.

UseEnrich

func UseEnrich[T any](name string, fn func(context.Context, *Envelope[T]) (*Envelope[T], error)) pipz.Chainable[*Envelope[T]]

Optional enhancement. If enrichment fails, processing continues with the original envelope.

UseRetry

func UseRetry[T any](maxAttempts int, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]

Wrap a processor with retry logic. Failed operations retry immediately.

UseBackoff

func UseBackoff[T any](maxAttempts int, baseDelay time.Duration, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]

Wrap a processor with exponential backoff retry logic.

UseTimeout

func UseTimeout[T any](d time.Duration, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]

Wrap a processor with a deadline. Operations exceeding the duration fail.

UseFallback

func UseFallback[T any](primary pipz.Chainable[*Envelope[T]], fallbacks ...pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]

Wrap a processor with fallback alternatives. If primary fails, each fallback is tried in order.

UseFilter

func UseFilter[T any](name string, condition func(context.Context, *Envelope[T]) bool, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]

Wrap a processor with a condition. If false, the envelope passes through unchanged.

UseRateLimit

func UseRateLimit[T any](rate float64, burst int) pipz.Chainable[*Envelope[T]]

Rate limiting processor using token bucket algorithm.

Envelope

EnvelopeT

type Envelope[T any] struct {
    Value    T
    Metadata Metadata
}

Wraps a value with metadata for pipeline processing. Access metadata in middleware via env.Metadata.

Error Signals

ErrorSignal

var ErrorSignal = capitan.NewSignal("herald.error", "Herald operational error")

Signal for all herald errors.

ErrorKey

var ErrorKey = capitan.NewKey[Error]("error", "herald.Error")

Key for extracting error details.

MetadataKey

var MetadataKey = capitan.NewKey[Metadata]("metadata", "herald.Metadata")

Key for extracting message metadata from subscriber events. Use in Capitan hooks to access broker headers.

Sentinel Errors

var (
    ErrNoWriter = errors.New("herald: no writer configured")
    ErrNoReader = errors.New("herald: no reader configured")
)
ErrorDescription
ErrNoWriterProvider has no write capability
ErrNoReaderProvider has no read capability