Publishing
Publishers observe capitan signals and forward events to external message brokers.
Basic Usage
package main
import (
"context"
kafkago "github.com/segmentio/kafka-go"
"github.com/zoobz-io/capitan"
"github.com/zoobz-io/herald"
"github.com/zoobz-io/herald/kafka"
)
type Order struct {
ID string `json:"id"`
Total float64 `json:"total"`
}
func main() {
ctx := context.Background()
// Define signal and key
orderCreated := capitan.NewSignal("order.created", "New order placed")
orderKey := capitan.NewKey[Order]("order", "app.Order")
// Create provider
writer := &kafkago.Writer{
Addr: kafkago.TCP("localhost:9092"),
Topic: "orders",
}
provider := kafka.New("orders", kafka.WithWriter(writer))
defer provider.Close()
// Create and start publisher
pub := herald.NewPublisher(provider, orderCreated, orderKey, nil)
pub.Start()
defer pub.Close()
// Emit events - automatically published to Kafka
capitan.Emit(ctx, orderCreated, orderKey.Field(Order{
ID: "ORD-123",
Total: 99.99,
}))
capitan.Shutdown()
}
How It Works
- Publisher observes signal — When
pub.Start()is called, the publisher registers as a capitan listener for the specified signal. - Event emitted — When
capitan.Emit()is called with the signal, the publisher's listener receives the event. - Extract and wrap — The publisher extracts the typed value using the key and wraps it in an Envelope with empty metadata.
- Process through pipeline — The Envelope passes through any configured middleware, which can add metadata headers.
- Serialize and publish — The terminal marshals the value to bytes and sends it to the broker with the Envelope's metadata.
Publisher Options
Pipeline Options
Add reliability features via pipeline options:
pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
herald.WithRetry[Order](3), // Retry up to 3 times
herald.WithBackoff[Order](3, 100*time.Millisecond), // Exponential backoff
herald.WithTimeout[Order](5*time.Second), // Timeout per operation
herald.WithCircuitBreaker[Order](5, 30*time.Second), // Circuit breaker
herald.WithRateLimit[Order](100, 10), // Rate limiting
})
Custom Codec
Use a different serialization format:
pub := herald.NewPublisher(provider, signal, key, nil,
herald.WithPublisherCodec[Order](myProtobufCodec))
Custom Capitan Instance
Use an isolated capitan instance instead of the global singleton:
c := capitan.New()
pub := herald.NewPublisher(provider, signal, key, nil,
herald.WithPublisherCapitan[Order](c))
Metadata Propagation
Use middleware to attach metadata that flows through to the broker:
// Middleware to add headers
opts := []herald.Option[Order]{
herald.WithMiddleware(
herald.UseTransform[Order]("add-headers", func(_ context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
env.Metadata["correlation-id"] = "abc-123"
env.Metadata["trace-id"] = requestID
return env
}),
),
}
pub := herald.NewPublisher(provider, orderCreated, orderKey, opts)
The metadata maps to broker-native headers (Kafka headers, AMQP properties, SQS attributes, etc.).
Error Handling
Publishing errors are emitted as capitan events:
capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
err, _ := herald.ErrorKey.From(e)
if err.Operation == "publish" {
log.Printf("Failed to publish to %s: %v", err.Signal, err.Err)
}
})
The error event includes:
Operation: "publish"Signal: The signal name that failedErr: The underlying error
Lifecycle
// Create publisher
pub := herald.NewPublisher(provider, signal, key, nil)
// Start observing (registers capitan listener)
pub.Start()
// ... emit events ...
// Stop observing and cleanup
pub.Close()
Always call Close() to unregister the listener and release resources.
Best Practices
One Direction Per Node
A node should be either a Publisher OR Subscriber for a given signal, never both. This prevents event loops in distributed topologies.
// Service A: Publishes order events
pub := herald.NewPublisher(provider, orderCreated, orderKey, nil)
// Service B: Subscribes to order events (different process)
sub := herald.NewSubscriber(provider, orderCreated, orderKey, nil)
Graceful Shutdown
Drain capitan before closing the publisher:
capitan.Shutdown() // Wait for pending events to process
pub.Close() // Then close publisher
provider.Close() // Finally close provider
Handle Serialization Errors
If marshaling fails, an error event is emitted. Ensure your types are JSON-serializable (or match your codec):
type Order struct {
ID string `json:"id"`
Total float64 `json:"total"`
// Unexported fields are not serialized
}