Core Concepts
Herald has five primitives: publishers, subscribers, providers, envelopes, and pipelines. Understanding these unlocks the full API.
Publishers
A publisher observes a capitan signal and forwards events to a message broker.
pub := herald.NewPublisher(provider, orderCreated, orderKey, nil)
pub.Start()
When Start() is called, the publisher registers as a capitan listener. Every event emitted to that signal is serialized and published to the broker.
// This event automatically flows to Kafka/Redis/NATS/etc.
capitan.Emit(ctx, orderCreated, orderKey.Field(order))
Publisher Lifecycle
pub := herald.NewPublisher(provider, signal, key, opts)
pub.Start() // Register listener, begin forwarding
// ... emit events ...
pub.Close() // Unregister listener, cleanup
Always call Close() to release resources.
Subscribers
A subscriber consumes messages from a broker and emits them as capitan events.
sub := herald.NewSubscriber(provider, orderCreated, orderKey, nil)
sub.Start(ctx)
When Start(ctx) is called, the subscriber spawns a goroutine that reads from the broker. Each message is deserialized and emitted as a capitan event.
// Messages from the broker arrive here
capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
order, _ := orderKey.From(e)
// process order
})
Subscriber Lifecycle
ctx, cancel := context.WithCancel(context.Background())
sub := herald.NewSubscriber(provider, signal, key, opts)
sub.Start(ctx) // Spawn consumer goroutine
// ... process messages ...
cancel() // Signal stop
sub.Close() // Wait for cleanup
Context cancellation stops message consumption gracefully.
Providers
A provider implements broker-specific communication. Herald includes 11 providers:
| Provider | Package | Use Case |
|---|---|---|
| Kafka | kafka | High-throughput streaming |
| NATS | nats | Lightweight cloud messaging |
| JetStream | jetstream | NATS with persistence |
| Google Pub/Sub | pubsub | GCP managed messaging |
| Redis Streams | redis | In-memory with persistence |
| AWS SQS | sqs | AWS managed queues |
| RabbitMQ/AMQP | amqp | Traditional message broker |
| AWS SNS | sns | Pub/sub fanout |
| BoltDB | bolt | Embedded local queues |
| Firestore | firestore | Firebase/GCP document store |
| io | io | Testing with io.Reader/Writer |
Provider Interface
All providers implement:
type Provider interface {
Publish(ctx context.Context, data []byte, metadata Metadata) error
Subscribe(ctx context.Context) <-chan Result[Message]
Ping(ctx context.Context) error
Close() error
}
Publishers call Publish. Subscribers read from Subscribe. The interface abstracts broker differences.
Message Structure
type Message struct {
Data []byte
Metadata Metadata
Ack func() error
Nack func() error
}
Data: Raw message bytesMetadata: Headers/attributes (maps to broker-native headers)Ack: Acknowledge successful processingNack: Signal processing failure (triggers redelivery)
Envelopes
An envelope wraps a typed value with metadata for pipeline processing:
type Envelope[T any] struct {
Value T
Metadata Metadata
}
Middleware operates on envelopes, allowing transformation and header injection before the terminal operation (publish/emit).
herald.UseTransform[Order]("add-trace", func(_ context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
env.Metadata["trace-id"] = generateTraceID()
return env
})
Pipelines
Pipelines compose middleware around a terminal operation. Herald uses pipz for pipeline construction.
Pipeline Options
Add reliability features:
opts := []herald.Option[Order]{
herald.WithRetry[Order](3), // Retry up to 3 times
herald.WithBackoff[Order](3, 100*time.Millisecond), // Exponential backoff
herald.WithTimeout[Order](5*time.Second), // Per-operation timeout
herald.WithCircuitBreaker[Order](5, 30*time.Second), // Fail-fast on errors
herald.WithRateLimit[Order](100, 10), // 100 ops/sec, burst 10
}
pub := herald.NewPublisher(provider, signal, key, opts)
Options wrap the terminal in order. The final pipeline structure:
Rate Limit → Circuit Breaker → Timeout → Backoff → Retry → Terminal (publish/emit)
Middleware
Add custom processing with middleware:
herald.WithMiddleware(
herald.UseApply[Order]("validate", func(ctx context.Context, env *herald.Envelope[Order]) (*herald.Envelope[Order], error) {
if env.Value.Total < 0 {
return nil, errors.New("invalid total")
}
return env, nil
}),
herald.UseEffect[Order]("log", func(ctx context.Context, env *herald.Envelope[Order]) error {
log.Printf("Processing order %s", env.Value.ID)
return nil
}),
herald.UseTransform[Order]("enrich", func(ctx context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
env.Metadata["processed-at"] = time.Now().Format(time.RFC3339)
return env
}),
)
| Processor | Purpose |
|---|---|
UseApply | Transform with possible error |
UseEffect | Side effect, envelope unchanged |
UseTransform | Pure transform, cannot fail |
Codecs
Codecs handle serialization between typed values and bytes:
type Codec interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
ContentType() string
}
Herald defaults to JSON. Use custom codecs for other formats:
pub := herald.NewPublisher(provider, signal, key, opts,
herald.WithPublisherCodec[Order](myProtobufCodec))
sub := herald.NewSubscriber(provider, signal, key, opts,
herald.WithSubscriberCodec[Order](myProtobufCodec))
Acknowledgment
Herald handles acknowledgment automatically:
| Outcome | Action |
|---|---|
| Successful processing | Ack() called |
| Deserialization failure | Nack() called |
| Pipeline failure | Nack() called |
Each provider maps ack/nack to broker-appropriate behavior:
| Provider | Ack | Nack |
|---|---|---|
| Kafka | Commit offset | Don't commit |
| JetStream | msg.Ack() | msg.Nak() |
| Pub/Sub | msg.Ack() | msg.Nack() |
| SQS | Delete message | Visibility timeout |
| AMQP | Ack(false) | Nack(false, true) |
Error Handling
All errors flow through capitan's event system:
capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
err, _ := herald.ErrorKey.From(e)
log.Printf("[herald] %s: %v", err.Operation, err.Err)
})
The error struct includes context:
type Error struct {
Operation string // "publish", "subscribe", "unmarshal", "ack", "nack"
Signal string // Signal name
Err error // Underlying error
Raw []byte // Raw payload (for unmarshal errors)
}
Metadata
Metadata flows through the entire pipeline:
Publishing: Middleware can add headers → Provider maps to broker headers
herald.UseTransform[Order]("headers", func(_ context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
env.Metadata["correlation-id"] = correlationID
return env
})
Subscribing: Broker headers → Metadata → Available in capitan event
capitan.Hook(signal, func(ctx context.Context, e *capitan.Event) {
meta, _ := herald.MetadataKey.From(e)
correlationID := meta["correlation-id"]
})
Next Steps
- Architecture — Internal design and data flow
- Publishing Guide — Detailed publishing patterns
- Subscribing Guide — Detailed subscribing patterns