Subscribing
Subscribers consume messages from external brokers and emit them as capitan events.
Basic Usage
package main
import (
"context"
"fmt"
kafkago "github.com/segmentio/kafka-go"
"github.com/zoobz-io/capitan"
"github.com/zoobz-io/herald"
"github.com/zoobz-io/herald/kafka"
)
type Order struct {
ID string `json:"id"`
Total float64 `json:"total"`
}
func main() {
ctx := context.Background()
// Define signal and key
orderCreated := capitan.NewSignal("order.created", "New order placed")
orderKey := capitan.NewKey[Order]("order", "app.Order")
// Create provider
reader := kafkago.NewReader(kafkago.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "orders",
GroupID: "order-processor",
})
provider := kafka.New("orders", kafka.WithReader(reader))
defer provider.Close()
// Hook listener before starting subscriber
capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
order, ok := orderKey.From(e)
if ok {
fmt.Printf("Received order: %s ($%.2f)\n", order.ID, order.Total)
}
})
// Create and start subscriber
sub := herald.NewSubscriber(provider, orderCreated, orderKey, nil)
sub.Start(ctx)
defer sub.Close()
// Block until shutdown
select {}
}
How It Works
- Subscriber starts consuming — When
sub.Start(ctx)is called, the subscriber spawns a goroutine that callsprovider.Subscribe(ctx). - Message received — The provider returns messages through a channel.
- Deserialize and wrap — The subscriber unmarshals the message bytes into the typed value and wraps it in an Envelope with the broker's metadata.
- Process through pipeline — The Envelope passes through any configured middleware, which can read/modify metadata.
- Emit to capitan — The typed value and metadata are emitted as fields on the capitan event.
- Acknowledge — On successful emission, the message is acknowledged. On failure, it's nack'd for redelivery.
Subscriber Options
Pipeline Options
Add reliability features:
sub := herald.NewSubscriber(provider, signal, key, []herald.Option[Order]{
herald.WithTimeout[Order](5*time.Second),
herald.WithRateLimit[Order](100, 10),
})
Custom Codec
Use a different serialization format:
sub := herald.NewSubscriber(provider, signal, key, nil,
herald.WithSubscriberCodec[Order](myProtobufCodec))
Custom Capitan Instance
Use an isolated capitan instance:
c := capitan.New()
sub := herald.NewSubscriber(provider, signal, key, nil,
herald.WithSubscriberCapitan[Order](c))
Metadata Access
Message metadata is available in two ways:
In Capitan Hooks
Metadata is emitted as a field on the Capitan event:
capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
order, _ := orderKey.From(e)
meta, _ := herald.MetadataKey.From(e)
correlationID := meta["correlation-id"]
traceID := meta["trace-id"]
processOrder(ctx, order, correlationID)
})
In Middleware
Access metadata via the Envelope for processing before emission:
opts := []herald.Option[Order]{
herald.WithMiddleware(
herald.UseEffect[Order]("log-headers", func(_ context.Context, env *herald.Envelope[Order]) error {
log.Printf("Processing order with trace %s", env.Metadata["trace-id"])
return nil
}),
),
}
sub := herald.NewSubscriber(provider, orderCreated, orderKey, opts)
Acknowledgment Semantics
Herald handles acknowledgment automatically:
| Outcome | Action |
|---|---|
| Successful deserialization + emit | Ack() called |
| Deserialization failure | Nack() called |
| Pipeline failure | Nack() called |
Each provider implements broker-appropriate ack/nack behavior:
| Provider | Ack | Nack |
|---|---|---|
| Kafka | Commit offset | Don't commit (redelivered) |
| JetStream | msg.Ack() | msg.Nak() (redelivered) |
| Pub/Sub | msg.Ack() | msg.Nack() |
| Redis | XACK | Remains pending |
| SQS | Delete message | Returns after visibility timeout |
| AMQP | Ack(false) | Nack(false, true) with requeue |
Error Handling
Deserialization and processing errors are emitted as capitan events:
capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
err, _ := herald.ErrorKey.From(e)
switch err.Operation {
case "unmarshal":
log.Printf("Failed to deserialize message: %v", err.Err)
log.Printf("Raw payload: %s", err.Raw)
case "subscribe":
log.Printf("Subscription error: %v", err.Err)
case "ack":
log.Printf("Failed to acknowledge: %v", err.Err)
case "nack":
log.Printf("Failed to nack: %v", err.Err)
}
})
Lifecycle
// Create subscriber
sub := herald.NewSubscriber(provider, signal, key, nil)
// Start consuming (spawns goroutine)
ctx, cancel := context.WithCancel(context.Background())
sub.Start(ctx)
// ... process messages ...
// Stop consuming
cancel() // Signal goroutine to stop
sub.Close() // Wait for cleanup
Graceful Shutdown
The subscriber responds to context cancellation. The shutdown order matters:
ctx, cancel := context.WithCancel(context.Background())
sub.Start(ctx)
// On shutdown signal (in this exact order):
cancel() // 1. Stop consuming new messages
sub.Close() // 2. Wait for in-flight messages to complete
capitan.Shutdown() // 3. Drain any queued capitan events
provider.Close() // 4. Close broker connection
Why this order:
cancel()signals the subscribe goroutine to stop fetching new messagessub.Close()waits for any message currently being processed (deserialization, pipeline, ack/nack)capitan.Shutdown()ensures all emitted events reach their hooksprovider.Close()releases broker resources after all operations complete
Common mistake: Closing the provider before the subscriber can cause ack/nack operations to fail on in-flight messages.
Best Practices
Hook Listeners Before Starting
Register capitan listeners before starting the subscriber to avoid missing events:
// First: set up listeners
capitan.Hook(signal, handler)
// Then: start subscriber
sub.Start(ctx)
Use Consumer Groups
For scalable consumption, use broker consumer groups:
// Kafka
reader := kafkago.NewReader(kafkago.ReaderConfig{
GroupID: "order-processor", // Consumer group
// ...
})
// Redis
provider := redis.New("stream", redis.WithGroup("processor-group"))
Handle Poison Messages
Messages that consistently fail deserialization will be nack'd repeatedly. Consider:
- Dead letter queues at the broker level
- Error signal handlers that log problematic payloads
- Circuit breakers to pause consumption on repeated failures