zoobzio December 10, 2025 Edit this page

Testing

Herald provides utilities for testing applications that use message bridging.

MockProvider

The testing package provides a mock provider for unit tests:

import heraldtest "github.com/zoobz-io/herald/testing"

func TestOrderPublishing(t *testing.T) {
    // Create mock provider
    mock := heraldtest.NewMockProvider()

    // Create publisher with mock
    pub := herald.NewPublisher(mock, orderSignal, orderKey, nil)
    pub.Start()

    // Emit event
    capitan.Emit(ctx, orderSignal, orderKey.Field(Order{ID: "123"}))
    capitan.Shutdown()
    pub.Close()

    // Verify
    if mock.PublishCount() != 1 {
        t.Errorf("expected 1 publish, got %d", mock.PublishCount())
    }

    published := mock.Published()
    // Assert on published[0].Data and published[0].Metadata
}

MockProvider API

MockProvider uses a fluent builder pattern:

// Create basic mock
mock := heraldtest.NewMockProvider()

// Configure with fluent methods (returns *MockProvider for chaining)
subCh := make(chan herald.Result[herald.Message], 10)
mock := heraldtest.NewMockProvider().
    WithSubscribeChannel(subCh).
    WithPublishCallback(func(data []byte, meta herald.Metadata) {
        // Custom logic on each publish
    })

// Query state
mock.Published()     // []PublishedMessage - all published messages
mock.PublishCount()  // int - number of published messages
mock.IsClosed()      // bool - whether Close() was called
mock.Reset()         // Clear published messages

Testing Publishers

Basic Publisher Test

func TestPublisher_EmitsToProvider(t *testing.T) {
    mock := heraldtest.NewMockProvider()

    signal := capitan.NewSignal("test.pub", "Test")
    key := capitan.NewKey[Order]("order", "test.Order")

    pub := herald.NewPublisher(mock, signal, key, nil)
    pub.Start(context.Background())

    capitan.Emit(context.Background(), signal, key.Field(Order{
        ID:    "ORD-123",
        Total: 99.99,
    }))

    capitan.Shutdown()
    pub.Close()

    if mock.PublishCount() != 1 {
        t.Fatalf("expected 1 message, got %d", mock.PublishCount())
    }

    var order Order
    json.Unmarshal(mock.Published()[0].Data, &order)

    if order.ID != "ORD-123" {
        t.Errorf("unexpected order ID: %s", order.ID)
    }
}

Testing Metadata Propagation

func TestPublisher_PropagatesMetadata(t *testing.T) {
    mock := heraldtest.NewMockProvider()

    // Middleware to add metadata
    opts := []herald.Option[Order]{
        herald.WithMiddleware(
            herald.UseTransform[Order]("add-correlation", func(_ context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
                env.Metadata["correlation-id"] = "abc-123"
                return env
            }),
        ),
    }

    pub := herald.NewPublisher(mock, signal, key, opts)
    pub.Start()

    capitan.Emit(context.Background(), signal, key.Field(order))
    capitan.Shutdown()
    pub.Close()

    meta := mock.Published()[0].Metadata
    if meta["correlation-id"] != "abc-123" {
        t.Errorf("metadata not propagated")
    }
}

Testing Subscribers

Basic Subscriber Test

func TestSubscriber_EmitsToCapitan(t *testing.T) {
    subCh := make(chan herald.Result[herald.Message], 1)
    mock := heraldtest.NewMockProvider(
        heraldtest.WithSubscribeChannel(subCh),
    )

    signal := capitan.NewSignal("test.sub", "Test")
    key := capitan.NewKey[Order]("order", "test.Order")

    var received Order
    var wg sync.WaitGroup
    wg.Add(1)

    capitan.Hook(signal, func(_ context.Context, e *capitan.Event) {
        received, _ = key.From(e)
        wg.Done()
    })

    sub := herald.NewSubscriber(mock, signal, key, nil)
    ctx, cancel := context.WithCancel(context.Background())
    sub.Start(ctx)

    // Send message
    data, _ := json.Marshal(Order{ID: "SUB-123", Total: 50.0})
    subCh <- herald.NewSuccess(herald.Message{
        Data: data,
        Ack:  func() error { return nil },
        Nack: func() error { return nil },
    })

    wg.Wait()
    cancel()
    sub.Close()

    if received.ID != "SUB-123" {
        t.Errorf("unexpected order ID: %s", received.ID)
    }
}

Testing Ack/Nack

func TestSubscriber_AcksOnSuccess(t *testing.T) {
    subCh := make(chan herald.Result[herald.Message], 1)
    mock := heraldtest.NewMockProvider(
        heraldtest.WithSubscribeChannel(subCh),
    )

    var acked bool
    data, _ := json.Marshal(Order{ID: "123"})
    subCh <- herald.NewSuccess(herald.Message{
        Data: data,
        Ack:  func() error { acked = true; return nil },
        Nack: func() error { return nil },
    })

    sub := herald.NewSubscriber(mock, signal, key, nil)
    ctx, cancel := context.WithCancel(context.Background())
    sub.Start(ctx)

    // Wait for processing
    time.Sleep(50 * time.Millisecond)

    cancel()
    sub.Close()

    if !acked {
        t.Error("message was not acknowledged")
    }
}

Testing Error Handling

func TestPublisher_EmitsErrorSignal(t *testing.T) {
    failingProvider := &mockProvider{
        publishErr: errors.New("connection refused"),
    }

    var capturedErr herald.Error

    capitan.Hook(herald.ErrorSignal, func(_ context.Context, e *capitan.Event) {
        capturedErr, _ = herald.ErrorKey.From(e)
    })

    pub := herald.NewPublisher(failingProvider, signal, key, nil)
    pub.Start()

    capitan.Emit(ctx, signal, key.Field(order))
    capitan.Shutdown()
    pub.Close()

    if capturedErr.Operation != "publish" {
        t.Errorf("expected publish error, got %s", capturedErr.Operation)
    }
}

Integration Testing

For integration tests with real brokers, use the provider packages directly:

func TestKafkaIntegration(t *testing.T) {
    if testing.Short() {
        t.Skip("skipping integration test")
    }

    // Setup real Kafka (e.g., with testcontainers)
    writer := &kafkago.Writer{...}
    reader := kafkago.NewReader(...)

    provider := kafka.New("test-topic",
        kafka.WithWriter(writer),
        kafka.WithReader(reader),
    )
    defer provider.Close()

    // Test actual publish/subscribe flow
    // ...
}

Isolated Capitan Instances

Use isolated capitan instances to prevent test interference:

func TestWithIsolatedCapitan(t *testing.T) {
    c := capitan.New()
    defer c.Shutdown()

    pub := herald.NewPublisher(mock, signal, key, nil,
        herald.WithPublisherCapitan[Order](c))

    sub := herald.NewSubscriber(mock, signal, key, nil,
        herald.WithSubscriberCapitan[Order](c))

    // Tests use isolated instance
}

Additional Test Helpers

The testing package provides additional utilities:

Message Helpers

// Create a message with no-op ack/nack
msg := heraldtest.NewTestMessage(data, metadata)

// Create a message with tracking callbacks
var acked, nacked bool
msg := heraldtest.NewTestMessageWithAck(data, metadata,
    func() { acked = true },   // onAck
    func() { nacked = true },  // onNack
)

MessageCapture

Capture messages for verification:

capture := heraldtest.NewMessageCapture()

// In your test setup, capture messages
capture.Capture(msg)

// Assert
if capture.Count() != 1 {
    t.Errorf("expected 1 message, got %d", capture.Count())
}

messages := capture.Messages() // []herald.Message

// Wait for async messages
if !capture.WaitForCount(3, 5*time.Second) {
    t.Fatal("timeout waiting for messages")
}

// Reset between tests
capture.Reset()

ErrorCapture

Capture herald errors for verification:

capture := heraldtest.NewErrorCapture()

// Hook into error signal
capitan.Hook(herald.ErrorSignal, func(_ context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)
    capture.Capture(err)
})

// After test actions...
if capture.Count() != 1 {
    t.Errorf("expected 1 error, got %d", capture.Count())
}

errors := capture.Errors() // []herald.Error

// Wait for async errors
if !capture.WaitForCount(1, time.Second) {
    t.Fatal("expected error not received")
}

Best Practices

  1. Use isolated capitan instances — Prevents test pollution
  2. Always call Shutdown() — Ensures events are processed
  3. Use buffered channels — Prevents blocking in tests
  4. Test both paths — Publish and subscribe separately
  5. Verify metadata — Ensure propagation works correctly
  6. Test error cases — Verify error signal emission
  7. Use timeouts — Prevent hanging tests
func TestWithTimeout(t *testing.T) {
    done := make(chan struct{})

    go func() {
        // Test logic
        close(done)
    }()

    select {
    case <-done:
        // Success
    case <-time.After(5 * time.Second):
        t.Fatal("test timed out")
    }
}