zoobzio January 2, 2026 Edit this page

Subscribing

Subscribers consume messages from external brokers and emit them as capitan events.

Basic Usage

package main

import (
    "context"
    "fmt"

    kafkago "github.com/segmentio/kafka-go"
    "github.com/zoobz-io/capitan"
    "github.com/zoobz-io/herald"
    "github.com/zoobz-io/herald/kafka"
)

type Order struct {
    ID    string  `json:"id"`
    Total float64 `json:"total"`
}

func main() {
    ctx := context.Background()

    // Define signal and key
    orderCreated := capitan.NewSignal("order.created", "New order placed")
    orderKey := capitan.NewKey[Order]("order", "app.Order")

    // Create provider
    reader := kafkago.NewReader(kafkago.ReaderConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "orders",
        GroupID: "order-processor",
    })
    provider := kafka.New("orders", kafka.WithReader(reader))
    defer provider.Close()

    // Hook listener before starting subscriber
    capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
        order, ok := orderKey.From(e)
        if ok {
            fmt.Printf("Received order: %s ($%.2f)\n", order.ID, order.Total)
        }
    })

    // Create and start subscriber
    sub := herald.NewSubscriber(provider, orderCreated, orderKey, nil)
    sub.Start(ctx)
    defer sub.Close()

    // Block until shutdown
    select {}
}

How It Works

  1. Subscriber starts consuming — When sub.Start(ctx) is called, the subscriber spawns a goroutine that calls provider.Subscribe(ctx).
  2. Message received — The provider returns messages through a channel.
  3. Deserialize and wrap — The subscriber unmarshals the message bytes into the typed value and wraps it in an Envelope with the broker's metadata.
  4. Process through pipeline — The Envelope passes through any configured middleware, which can read/modify metadata.
  5. Emit to capitan — The typed value and metadata are emitted as fields on the capitan event.
  6. Acknowledge — On successful emission, the message is acknowledged. On failure, it's nack'd for redelivery.

Subscriber Options

Pipeline Options

Add reliability features:

sub := herald.NewSubscriber(provider, signal, key, []herald.Option[Order]{
    herald.WithTimeout[Order](5*time.Second),
    herald.WithRateLimit[Order](100, 10),
})

Custom Codec

Use a different serialization format:

sub := herald.NewSubscriber(provider, signal, key, nil,
    herald.WithSubscriberCodec[Order](myProtobufCodec))

Custom Capitan Instance

Use an isolated capitan instance:

c := capitan.New()
sub := herald.NewSubscriber(provider, signal, key, nil,
    herald.WithSubscriberCapitan[Order](c))

Metadata Access

Message metadata is available in two ways:

In Capitan Hooks

Metadata is emitted as a field on the Capitan event:

capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
    order, _ := orderKey.From(e)
    meta, _ := herald.MetadataKey.From(e)

    correlationID := meta["correlation-id"]
    traceID := meta["trace-id"]
    processOrder(ctx, order, correlationID)
})

In Middleware

Access metadata via the Envelope for processing before emission:

opts := []herald.Option[Order]{
    herald.WithMiddleware(
        herald.UseEffect[Order]("log-headers", func(_ context.Context, env *herald.Envelope[Order]) error {
            log.Printf("Processing order with trace %s", env.Metadata["trace-id"])
            return nil
        }),
    ),
}

sub := herald.NewSubscriber(provider, orderCreated, orderKey, opts)

Acknowledgment Semantics

Herald handles acknowledgment automatically:

OutcomeAction
Successful deserialization + emitAck() called
Deserialization failureNack() called
Pipeline failureNack() called

Each provider implements broker-appropriate ack/nack behavior:

ProviderAckNack
KafkaCommit offsetDon't commit (redelivered)
JetStreammsg.Ack()msg.Nak() (redelivered)
Pub/Submsg.Ack()msg.Nack()
RedisXACKRemains pending
SQSDelete messageReturns after visibility timeout
AMQPAck(false)Nack(false, true) with requeue

Error Handling

Deserialization and processing errors are emitted as capitan events:

capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)

    switch err.Operation {
    case "unmarshal":
        log.Printf("Failed to deserialize message: %v", err.Err)
        log.Printf("Raw payload: %s", err.Raw)
    case "subscribe":
        log.Printf("Subscription error: %v", err.Err)
    case "ack":
        log.Printf("Failed to acknowledge: %v", err.Err)
    case "nack":
        log.Printf("Failed to nack: %v", err.Err)
    }
})

Lifecycle

// Create subscriber
sub := herald.NewSubscriber(provider, signal, key, nil)

// Start consuming (spawns goroutine)
ctx, cancel := context.WithCancel(context.Background())
sub.Start(ctx)

// ... process messages ...

// Stop consuming
cancel()      // Signal goroutine to stop
sub.Close()   // Wait for cleanup

Graceful Shutdown

The subscriber responds to context cancellation. The shutdown order matters:

ctx, cancel := context.WithCancel(context.Background())
sub.Start(ctx)

// On shutdown signal (in this exact order):
cancel()                  // 1. Stop consuming new messages
sub.Close()              // 2. Wait for in-flight messages to complete
capitan.Shutdown()       // 3. Drain any queued capitan events
provider.Close()         // 4. Close broker connection

Why this order:

  1. cancel() signals the subscribe goroutine to stop fetching new messages
  2. sub.Close() waits for any message currently being processed (deserialization, pipeline, ack/nack)
  3. capitan.Shutdown() ensures all emitted events reach their hooks
  4. provider.Close() releases broker resources after all operations complete

Common mistake: Closing the provider before the subscriber can cause ack/nack operations to fail on in-flight messages.

Best Practices

Hook Listeners Before Starting

Register capitan listeners before starting the subscriber to avoid missing events:

// First: set up listeners
capitan.Hook(signal, handler)

// Then: start subscriber
sub.Start(ctx)

Use Consumer Groups

For scalable consumption, use broker consumer groups:

// Kafka
reader := kafkago.NewReader(kafkago.ReaderConfig{
    GroupID: "order-processor",  // Consumer group
    // ...
})

// Redis
provider := redis.New("stream", redis.WithGroup("processor-group"))

Handle Poison Messages

Messages that consistently fail deserialization will be nack'd repeatedly. Consider:

  • Dead letter queues at the broker level
  • Error signal handlers that log problematic payloads
  • Circuit breakers to pause consumption on repeated failures