API Reference
Complete reference for herald's public API.
Core Types
Provider
type Provider interface {
Publish(ctx context.Context, data []byte, metadata Metadata) error
Subscribe(ctx context.Context) <-chan Result[Message]
Ping(ctx context.Context) error
Close() error
}
The interface all broker providers implement.
| Method | Description |
|---|---|
Publish | Send raw bytes with metadata to the broker |
Subscribe | Return a stream of messages from the broker |
Ping | Verify broker connectivity for health checks |
Close | Release broker resources |
Message
type Message struct {
Data []byte
Metadata Metadata
Ack func() error
Nack func() error
}
Represents a message received from a broker.
| Field | Description |
|---|---|
Data | Raw message payload |
Metadata | Message headers/attributes |
Ack | Acknowledge successful processing |
Nack | Reject for redelivery |
Metadata
type Metadata map[string]string
Key-value pairs for message headers.
ResultT
type Result[T any] struct {
// unexported fields
}
func NewSuccess[T any](value T) Result[T]
func NewError[T any](err error) Result[T]
func (r Result[T]) IsSuccess() bool
func (r Result[T]) IsError() bool
func (r Result[T]) Value() T
func (r Result[T]) Error() error
Discriminated union for either a value or error.
Error
type Error struct {
Operation string // "publish", "subscribe", "unmarshal", "ack", "nack"
Signal string // Signal name
Err string // Error message (string for JSON serialization)
Nack bool // Whether message was nack'd
Raw []byte // Raw payload (for unmarshal errors)
}
Error information emitted on ErrorSignal. Note that Err is a string (not error) to support JSON serialization.
Publisher
NewPublisher
func NewPublisher[T any](
provider Provider,
signal capitan.Signal,
key capitan.Key[T],
opts []Option[T],
pubOpts ...PublisherOption[T],
) *Publisher[T]
Creates a new publisher.
| Parameter | Description |
|---|---|
provider | Broker provider |
signal | Capitan signal to observe |
key | Typed key for extracting values |
opts | Pipeline options (retry, timeout, etc.) |
pubOpts | Publisher-specific options |
Publisher Methods
func (p *Publisher[T]) Start()
func (p *Publisher[T]) Close() error
| Method | Description |
|---|---|
Start | Begin observing the signal. Must be called exactly once. |
Close | Stop observing and wait for in-flight publishes |
Publisher Options
func WithPublisherCodec[T any](codec Codec) PublisherOption[T]
func WithPublisherCapitan[T any](c *capitan.Capitan) PublisherOption[T]
| Option | Description |
|---|---|
WithPublisherCodec | Use custom serialization codec |
WithPublisherCapitan | Use custom capitan instance |
Subscriber
NewSubscriber
func NewSubscriber[T any](
provider Provider,
signal capitan.Signal,
key capitan.Key[T],
opts []Option[T],
subOpts ...SubscriberOption[T],
) *Subscriber[T]
Creates a new subscriber.
| Parameter | Description |
|---|---|
provider | Broker provider |
signal | Capitan signal to emit to |
key | Typed key for creating fields |
opts | Pipeline options |
subOpts | Subscriber-specific options |
Subscriber Methods
func (s *Subscriber[T]) Start(ctx context.Context)
func (s *Subscriber[T]) Close() error
| Method | Description |
|---|---|
Start | Begin consuming messages. Must be called exactly once. The context controls subscriber lifetime. |
Close | Stop consuming and wait for goroutine to exit |
Subscriber Options
func WithSubscriberCodec[T any](codec Codec) SubscriberOption[T]
func WithSubscriberCapitan[T any](c *capitan.Capitan) SubscriberOption[T]
| Option | Description |
|---|---|
WithSubscriberCodec | Use custom deserialization codec |
WithSubscriberCapitan | Use custom capitan instance |
Codec
Codec Interface
type Codec interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
ContentType() string
}
JSONCodec
type JSONCodec struct{}
Default codec using encoding/json.
Pipeline Options
type Option[T any] func(pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
WithRetry
func WithRetry[T any](maxAttempts int) Option[T]
Retry failed operations up to maxAttempts times.
WithBackoff
func WithBackoff[T any](maxAttempts int, baseDelay time.Duration) Option[T]
Retry with exponential backoff. Delays double each attempt.
WithTimeout
func WithTimeout[T any](timeout time.Duration) Option[T]
Cancel operations exceeding the timeout.
WithCircuitBreaker
func WithCircuitBreaker[T any](maxFailures int, recoveryTime time.Duration) Option[T]
Open circuit after maxFailures consecutive failures. Attempt recovery after recoveryTime.
WithRateLimit
func WithRateLimit[T any](rate float64, burst int) Option[T]
Limit operations to rate per second with burst capacity.
WithErrorHandler
func WithErrorHandler[T any](handler pipz.Chainable[*pipz.Error[*Envelope[T]]]) Option[T]
Custom error handling via a pipz chainable. The handler receives rich error context including the input Envelope, timestamp, and path through the pipeline. The handler observes errors without modifying the error flow.
// Example: Log errors with context
errorLogger := pipz.Effect("log-error", func(ctx context.Context, err *pipz.Error[*herald.Envelope[Order]]) error {
log.Printf("failed at %v: %v (input: %+v)", err.Path, err.Err, err.InputData.Value)
return nil
})
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithErrorHandler[Order](errorLogger),
})
WithPipeline
func WithPipeline[T any](custom pipz.Chainable[*Envelope[T]]) Option[T]
Use a fully custom pipeline. Replaces any default processing.
Middleware
Middleware processors are used inside WithMiddleware to build processing pipelines. All processors operate on *Envelope[T], providing access to both Value and Metadata.
WithFilter
func WithFilter[T any](name string, condition func(context.Context, *Envelope[T]) bool) Option[T]
Wrap the pipeline with a condition. If the condition returns false, the pipeline is skipped.
WithMiddleware
func WithMiddleware[T any](processors ...pipz.Chainable[*Envelope[T]]) Option[T]
Wrap the pipeline with a sequence of processors. Processors execute in order, with the wrapped pipeline last.
herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithMiddleware(
herald.UseEffect[Order]("log", logFn),
herald.UseApply[Order]("validate", validateFn),
),
})
UseTransform
func UseTransform[T any](name string, fn func(context.Context, *Envelope[T]) *Envelope[T]) pipz.Chainable[*Envelope[T]]
Pure transformation. Cannot fail. Use for transformations that always succeed.
UseApply
func UseApply[T any](name string, fn func(context.Context, *Envelope[T]) (*Envelope[T], error)) pipz.Chainable[*Envelope[T]]
Transform with possible error. Use for validation, enrichment, or fallible transformations.
UseEffect
func UseEffect[T any](name string, fn func(context.Context, *Envelope[T]) error) pipz.Chainable[*Envelope[T]]
Side effect only. The envelope passes through unchanged. Use for logging, metrics, or notifications.
UseMutate
func UseMutate[T any](name string, transformer func(context.Context, *Envelope[T]) *Envelope[T], condition func(context.Context, *Envelope[T]) bool) pipz.Chainable[*Envelope[T]]
Conditional transformation. The transformer is only applied if the condition returns true.
UseEnrich
func UseEnrich[T any](name string, fn func(context.Context, *Envelope[T]) (*Envelope[T], error)) pipz.Chainable[*Envelope[T]]
Optional enhancement. If enrichment fails, processing continues with the original envelope.
UseRetry
func UseRetry[T any](maxAttempts int, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
Wrap a processor with retry logic. Failed operations retry immediately.
UseBackoff
func UseBackoff[T any](maxAttempts int, baseDelay time.Duration, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
Wrap a processor with exponential backoff retry logic.
UseTimeout
func UseTimeout[T any](d time.Duration, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
Wrap a processor with a deadline. Operations exceeding the duration fail.
UseFallback
func UseFallback[T any](primary pipz.Chainable[*Envelope[T]], fallbacks ...pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
Wrap a processor with fallback alternatives. If primary fails, each fallback is tried in order.
UseFilter
func UseFilter[T any](name string, condition func(context.Context, *Envelope[T]) bool, processor pipz.Chainable[*Envelope[T]]) pipz.Chainable[*Envelope[T]]
Wrap a processor with a condition. If false, the envelope passes through unchanged.
UseRateLimit
func UseRateLimit[T any](rate float64, burst int) pipz.Chainable[*Envelope[T]]
Rate limiting processor using token bucket algorithm.
Envelope
EnvelopeT
type Envelope[T any] struct {
Value T
Metadata Metadata
}
Wraps a value with metadata for pipeline processing. Access metadata in middleware via env.Metadata.
Error Signals
ErrorSignal
var ErrorSignal = capitan.NewSignal("herald.error", "Herald operational error")
Signal for all herald errors.
ErrorKey
var ErrorKey = capitan.NewKey[Error]("error", "herald.Error")
Key for extracting error details.
MetadataKey
var MetadataKey = capitan.NewKey[Metadata]("metadata", "herald.Metadata")
Key for extracting message metadata from subscriber events. Use in Capitan hooks to access broker headers.
Sentinel Errors
var (
ErrNoWriter = errors.New("herald: no writer configured")
ErrNoReader = errors.New("herald: no reader configured")
)
| Error | Description |
|---|---|
ErrNoWriter | Provider has no write capability |
ErrNoReader | Provider has no read capability |