zoobzio January 2, 2026 Edit this page

Publishing

Publishers observe capitan signals and forward events to external message brokers.

Basic Usage

package main

import (
    "context"

    kafkago "github.com/segmentio/kafka-go"
    "github.com/zoobz-io/capitan"
    "github.com/zoobz-io/herald"
    "github.com/zoobz-io/herald/kafka"
)

type Order struct {
    ID    string  `json:"id"`
    Total float64 `json:"total"`
}

func main() {
    ctx := context.Background()

    // Define signal and key
    orderCreated := capitan.NewSignal("order.created", "New order placed")
    orderKey := capitan.NewKey[Order]("order", "app.Order")

    // Create provider
    writer := &kafkago.Writer{
        Addr:  kafkago.TCP("localhost:9092"),
        Topic: "orders",
    }
    provider := kafka.New("orders", kafka.WithWriter(writer))
    defer provider.Close()

    // Create and start publisher
    pub := herald.NewPublisher(provider, orderCreated, orderKey, nil)
    pub.Start()
    defer pub.Close()

    // Emit events - automatically published to Kafka
    capitan.Emit(ctx, orderCreated, orderKey.Field(Order{
        ID:    "ORD-123",
        Total: 99.99,
    }))

    capitan.Shutdown()
}

How It Works

  1. Publisher observes signal — When pub.Start() is called, the publisher registers as a capitan listener for the specified signal.
  2. Event emitted — When capitan.Emit() is called with the signal, the publisher's listener receives the event.
  3. Extract and wrap — The publisher extracts the typed value using the key and wraps it in an Envelope with empty metadata.
  4. Process through pipeline — The Envelope passes through any configured middleware, which can add metadata headers.
  5. Serialize and publish — The terminal marshals the value to bytes and sends it to the broker with the Envelope's metadata.

Publisher Options

Pipeline Options

Add reliability features via pipeline options:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithRetry[Order](3),                          // Retry up to 3 times
    herald.WithBackoff[Order](3, 100*time.Millisecond),  // Exponential backoff
    herald.WithTimeout[Order](5*time.Second),            // Timeout per operation
    herald.WithCircuitBreaker[Order](5, 30*time.Second), // Circuit breaker
    herald.WithRateLimit[Order](100, 10),                // Rate limiting
})

Custom Codec

Use a different serialization format:

pub := herald.NewPublisher(provider, signal, key, nil,
    herald.WithPublisherCodec[Order](myProtobufCodec))

Custom Capitan Instance

Use an isolated capitan instance instead of the global singleton:

c := capitan.New()
pub := herald.NewPublisher(provider, signal, key, nil,
    herald.WithPublisherCapitan[Order](c))

Metadata Propagation

Use middleware to attach metadata that flows through to the broker:

// Middleware to add headers
opts := []herald.Option[Order]{
    herald.WithMiddleware(
        herald.UseTransform[Order]("add-headers", func(_ context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
            env.Metadata["correlation-id"] = "abc-123"
            env.Metadata["trace-id"] = requestID
            return env
        }),
    ),
}

pub := herald.NewPublisher(provider, orderCreated, orderKey, opts)

The metadata maps to broker-native headers (Kafka headers, AMQP properties, SQS attributes, etc.).

Error Handling

Publishing errors are emitted as capitan events:

capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)
    if err.Operation == "publish" {
        log.Printf("Failed to publish to %s: %v", err.Signal, err.Err)
    }
})

The error event includes:

  • Operation: "publish"
  • Signal: The signal name that failed
  • Err: The underlying error

Lifecycle

// Create publisher
pub := herald.NewPublisher(provider, signal, key, nil)

// Start observing (registers capitan listener)
pub.Start()

// ... emit events ...

// Stop observing and cleanup
pub.Close()

Always call Close() to unregister the listener and release resources.

Best Practices

One Direction Per Node

A node should be either a Publisher OR Subscriber for a given signal, never both. This prevents event loops in distributed topologies.

// Service A: Publishes order events
pub := herald.NewPublisher(provider, orderCreated, orderKey, nil)

// Service B: Subscribes to order events (different process)
sub := herald.NewSubscriber(provider, orderCreated, orderKey, nil)

Graceful Shutdown

Drain capitan before closing the publisher:

capitan.Shutdown() // Wait for pending events to process
pub.Close()        // Then close publisher
provider.Close()   // Finally close provider

Handle Serialization Errors

If marshaling fails, an error event is emitted. Ensure your types are JSON-serializable (or match your codec):

type Order struct {
    ID    string  `json:"id"`
    Total float64 `json:"total"`
    // Unexported fields are not serialized
}