Testing
Herald provides utilities for testing applications that use message bridging.
MockProvider
The testing package provides a mock provider for unit tests:
import heraldtest "github.com/zoobz-io/herald/testing"
func TestOrderPublishing(t *testing.T) {
// Create mock provider
mock := heraldtest.NewMockProvider()
// Create publisher with mock
pub := herald.NewPublisher(mock, orderSignal, orderKey, nil)
pub.Start()
// Emit event
capitan.Emit(ctx, orderSignal, orderKey.Field(Order{ID: "123"}))
capitan.Shutdown()
pub.Close()
// Verify
if mock.PublishCount() != 1 {
t.Errorf("expected 1 publish, got %d", mock.PublishCount())
}
published := mock.Published()
// Assert on published[0].Data and published[0].Metadata
}
MockProvider API
MockProvider uses a fluent builder pattern:
// Create basic mock
mock := heraldtest.NewMockProvider()
// Configure with fluent methods (returns *MockProvider for chaining)
subCh := make(chan herald.Result[herald.Message], 10)
mock := heraldtest.NewMockProvider().
WithSubscribeChannel(subCh).
WithPublishCallback(func(data []byte, meta herald.Metadata) {
// Custom logic on each publish
})
// Query state
mock.Published() // []PublishedMessage - all published messages
mock.PublishCount() // int - number of published messages
mock.IsClosed() // bool - whether Close() was called
mock.Reset() // Clear published messages
Testing Publishers
Basic Publisher Test
func TestPublisher_EmitsToProvider(t *testing.T) {
mock := heraldtest.NewMockProvider()
signal := capitan.NewSignal("test.pub", "Test")
key := capitan.NewKey[Order]("order", "test.Order")
pub := herald.NewPublisher(mock, signal, key, nil)
pub.Start(context.Background())
capitan.Emit(context.Background(), signal, key.Field(Order{
ID: "ORD-123",
Total: 99.99,
}))
capitan.Shutdown()
pub.Close()
if mock.PublishCount() != 1 {
t.Fatalf("expected 1 message, got %d", mock.PublishCount())
}
var order Order
json.Unmarshal(mock.Published()[0].Data, &order)
if order.ID != "ORD-123" {
t.Errorf("unexpected order ID: %s", order.ID)
}
}
Testing Metadata Propagation
func TestPublisher_PropagatesMetadata(t *testing.T) {
mock := heraldtest.NewMockProvider()
// Middleware to add metadata
opts := []herald.Option[Order]{
herald.WithMiddleware(
herald.UseTransform[Order]("add-correlation", func(_ context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
env.Metadata["correlation-id"] = "abc-123"
return env
}),
),
}
pub := herald.NewPublisher(mock, signal, key, opts)
pub.Start()
capitan.Emit(context.Background(), signal, key.Field(order))
capitan.Shutdown()
pub.Close()
meta := mock.Published()[0].Metadata
if meta["correlation-id"] != "abc-123" {
t.Errorf("metadata not propagated")
}
}
Testing Subscribers
Basic Subscriber Test
func TestSubscriber_EmitsToCapitan(t *testing.T) {
subCh := make(chan herald.Result[herald.Message], 1)
mock := heraldtest.NewMockProvider(
heraldtest.WithSubscribeChannel(subCh),
)
signal := capitan.NewSignal("test.sub", "Test")
key := capitan.NewKey[Order]("order", "test.Order")
var received Order
var wg sync.WaitGroup
wg.Add(1)
capitan.Hook(signal, func(_ context.Context, e *capitan.Event) {
received, _ = key.From(e)
wg.Done()
})
sub := herald.NewSubscriber(mock, signal, key, nil)
ctx, cancel := context.WithCancel(context.Background())
sub.Start(ctx)
// Send message
data, _ := json.Marshal(Order{ID: "SUB-123", Total: 50.0})
subCh <- herald.NewSuccess(herald.Message{
Data: data,
Ack: func() error { return nil },
Nack: func() error { return nil },
})
wg.Wait()
cancel()
sub.Close()
if received.ID != "SUB-123" {
t.Errorf("unexpected order ID: %s", received.ID)
}
}
Testing Ack/Nack
func TestSubscriber_AcksOnSuccess(t *testing.T) {
subCh := make(chan herald.Result[herald.Message], 1)
mock := heraldtest.NewMockProvider(
heraldtest.WithSubscribeChannel(subCh),
)
var acked bool
data, _ := json.Marshal(Order{ID: "123"})
subCh <- herald.NewSuccess(herald.Message{
Data: data,
Ack: func() error { acked = true; return nil },
Nack: func() error { return nil },
})
sub := herald.NewSubscriber(mock, signal, key, nil)
ctx, cancel := context.WithCancel(context.Background())
sub.Start(ctx)
// Wait for processing
time.Sleep(50 * time.Millisecond)
cancel()
sub.Close()
if !acked {
t.Error("message was not acknowledged")
}
}
Testing Error Handling
func TestPublisher_EmitsErrorSignal(t *testing.T) {
failingProvider := &mockProvider{
publishErr: errors.New("connection refused"),
}
var capturedErr herald.Error
capitan.Hook(herald.ErrorSignal, func(_ context.Context, e *capitan.Event) {
capturedErr, _ = herald.ErrorKey.From(e)
})
pub := herald.NewPublisher(failingProvider, signal, key, nil)
pub.Start()
capitan.Emit(ctx, signal, key.Field(order))
capitan.Shutdown()
pub.Close()
if capturedErr.Operation != "publish" {
t.Errorf("expected publish error, got %s", capturedErr.Operation)
}
}
Integration Testing
For integration tests with real brokers, use the provider packages directly:
func TestKafkaIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
// Setup real Kafka (e.g., with testcontainers)
writer := &kafkago.Writer{...}
reader := kafkago.NewReader(...)
provider := kafka.New("test-topic",
kafka.WithWriter(writer),
kafka.WithReader(reader),
)
defer provider.Close()
// Test actual publish/subscribe flow
// ...
}
Isolated Capitan Instances
Use isolated capitan instances to prevent test interference:
func TestWithIsolatedCapitan(t *testing.T) {
c := capitan.New()
defer c.Shutdown()
pub := herald.NewPublisher(mock, signal, key, nil,
herald.WithPublisherCapitan[Order](c))
sub := herald.NewSubscriber(mock, signal, key, nil,
herald.WithSubscriberCapitan[Order](c))
// Tests use isolated instance
}
Additional Test Helpers
The testing package provides additional utilities:
Message Helpers
// Create a message with no-op ack/nack
msg := heraldtest.NewTestMessage(data, metadata)
// Create a message with tracking callbacks
var acked, nacked bool
msg := heraldtest.NewTestMessageWithAck(data, metadata,
func() { acked = true }, // onAck
func() { nacked = true }, // onNack
)
MessageCapture
Capture messages for verification:
capture := heraldtest.NewMessageCapture()
// In your test setup, capture messages
capture.Capture(msg)
// Assert
if capture.Count() != 1 {
t.Errorf("expected 1 message, got %d", capture.Count())
}
messages := capture.Messages() // []herald.Message
// Wait for async messages
if !capture.WaitForCount(3, 5*time.Second) {
t.Fatal("timeout waiting for messages")
}
// Reset between tests
capture.Reset()
ErrorCapture
Capture herald errors for verification:
capture := heraldtest.NewErrorCapture()
// Hook into error signal
capitan.Hook(herald.ErrorSignal, func(_ context.Context, e *capitan.Event) {
err, _ := herald.ErrorKey.From(e)
capture.Capture(err)
})
// After test actions...
if capture.Count() != 1 {
t.Errorf("expected 1 error, got %d", capture.Count())
}
errors := capture.Errors() // []herald.Error
// Wait for async errors
if !capture.WaitForCount(1, time.Second) {
t.Fatal("expected error not received")
}
Best Practices
- Use isolated capitan instances — Prevents test pollution
- Always call Shutdown() — Ensures events are processed
- Use buffered channels — Prevents blocking in tests
- Test both paths — Publish and subscribe separately
- Verify metadata — Ensure propagation works correctly
- Test error cases — Verify error signal emission
- Use timeouts — Prevent hanging tests
func TestWithTimeout(t *testing.T) {
done := make(chan struct{})
go func() {
// Test logic
close(done)
}()
select {
case <-done:
// Success
case <-time.After(5 * time.Second):
t.Fatal("test timed out")
}
}