Azure Service Bus - Integration Testing with xUnit and Admin Client

Part 3: How to integration test Azure Service Bus using ServiceBusAdministrationClient and xUnit

Azure Service Bus - Integration Testing with xUnit and Admin Client

Service Bus Series - Part 3

The previous post, Part 2: Authentication and Sending to a Queue, covered authenticating to and writing to a Service Bus Queue. To do this we used the QueueSender background service. While this worked, it begs the question; How do we test when integrating Azure Service Bus? That's exactly what we'll answer here!

QueueSender - Our SUT

Our System Under Test for this exercise is the previously created QueueSender. This background service starts up, sends a single message, and completes. The entire service, from Part 2, is given below:

public class QueueSender
    : BackgroundService
{
    private readonly ILogger<QueueSender> logger;
    private readonly ServiceBusSender sender;

    public QueueSender(
        ILogger<QueueSender> logger,
        ServiceBusSender sender)

    {
        this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
        this.sender = sender ?? throw new ArgumentNullException(nameof(sender));
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
        using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, timeout.Token);
        try
        {
            await sender.SendMessageAsync(new("Hello World"), cts.Token);

            logger.LogInformation("Sent Message!!!");
        }
        catch (OperationCanceledException ex) 
            when (timeout.IsCancellationRequested)
        {
            logger.LogWarning(ex, "Operation timed out");
            throw;
        }
        catch (OperationCanceledException ex) 
            when (stoppingToken.IsCancellationRequested)
        {
            logger.LogInformation(ex, "Shutdown early");
            throw;
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Unhandled exception");
            throw;
        }
    }
}

What We'll Test

This simple service presents a couple of unique challenges: it's a background service, and we're using Service Bus as a message broker. Testing a background service can be difficult since the only public methods are StartAsync & StopAsync, this means we only have indirect access to the service. Testing with Service Bus is itself difficult since it cannot be ran locally and any test against it must treat it as the shared resource that it is.

With these considerations in mind, we'll focus our testing on integration and verify the following:

  1. Does our system connect to our Service Bus?

  2. Does the specified Queue exist?

  3. Can we receive the message that we expect to be sent?


xUnit Project Setup

To start we'll create a new xUnit test project via the following:

dotnet new xunit -o Part3

Next, add a reference to the project with our SUT, in my case Part2.csproj

dotnet add reference Part2.csproj

Then, go ahead and delete UnitTest1.cs

Admin Client Creation

The key to integration testing an Azure Service Bus is the class ServiceBusAdministrationClient and the functionality it provides. From this client you can control, verify, and manipulate virtually anything about your Service Bus that you could via an IaC template or the Azure Portal itself.

Creating an Admin Client is as easy as creating a standard Service Bus Client. You only need the fullyqualifiedNamespace, a tokenCredential, and the default ServiceBusAdministrationClientOptions will work fine for our purpose.

new ServiceBusAdministrationClient(hostName, credentials, adminOptions);

Note though, the Identity and Credentials you use must have permissions to conduct the operations you require. The simple identity we set up previously will work for our tests.

Creating the Test Fixture

For each of our 3 tests outlined above, we'll leverage the same underlying Test Fixture. This fixture will provide access to a fresh IConfiguration, our ServiceBusAdminClient, and provide a ServiceBusReceiver.

public sealed class ServiceBusFixture
    : IAsyncLifetime
{
    private readonly ServiceBusClient client;

    public IConfiguration Config => BuildConfig();
    public ServiceBusAdministrationClient AdminClient { get; }

    public ServiceBusFixture()
    {
        var credentials = Config.CreateDefaultCredential();
        var hostName = Config["ServiceBusHost"];

        client = new ServiceBusClient(hostName, credentials, new()
        {
            TransportType = ServiceBusTransportType.AmqpWebSockets,
            Identifier = $"Test-Client"
        });

        AdminClient = new ServiceBusAdministrationClient(hostName, credentials, new());
    }

    public ServiceBusReceiver GetReceiver(string queue)
        => client.CreateReceiver(queue, options: new()
        {
            ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
            Identifier = $"Test-Receiver"
        });

    private static IConfiguration BuildConfig()
        => new ConfigurationBuilder()
        .AddUserSecrets<ServiceBusFixture>()
        .Build();

    public Task InitializeAsync()
        => Task.CompletedTask;

    public async Task DisposeAsync()
        => await (client?.DisposeAsync() ?? ValueTask.CompletedTask);
}

The Test Fixture also implements the IAsyncLifetime interface to properly dispose of the Service Bus Client after the tests complete.

Testing the Connection and Queue

Our first two tests can be combined;

  1. Does our system connect to our Service Bus?

  2. Does the specified Queue exist?

If we check for the existence of the specified Queue, we'll also ensure our connection to the Service Bus is working.

To test this, we'll create a new test file ServiceBusConnectionTests.cs and bring in our Test Fixture. From there we'll use the Admin Client to check for existence of the Queue within a given timeout since the operation is async and can fail.

public sealed class ServiceBusConnectionTests
    : IClassFixture<ServiceBusFixture>
{
    private readonly IConfiguration config;
    private readonly ServiceBusAdministrationClient adminClient;

    public ServiceBusConnectionTests(ServiceBusFixture fixture)
    {
        ArgumentNullException.ThrowIfNull(fixture);

        config = fixture.Config
            ?? throw new ArgumentNullException(nameof(fixture.Config));
        adminClient = fixture.AdminClient
            ?? throw new ArgumentNullException(nameof(fixture.AdminClient));
    }

    [Fact]
    public async Task Can_Connect_And_Queue_Exists()
    {
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));        
        var queueName = config["Queue"];

        var result = await adminClient.QueueExistsAsync(queueName, cts.Token);

        Assert.True(result);
    }
}

Testing Sending and Receiving

Our third test is a little more complicated:

  1. Can we receive the message that we expect to be sent?

We'll actually be creating an instance of our SUT, the QueueSender, and ensuring the message that we expect to be sent can then be received. Our test follows essentially four steps:

  1. Create a temporary test queue and point our config to it

  2. Create and start the background serviceQueueSender

  3. Receive any message from the test queue

  4. Shutdown theQueueSenderand cleanup the test queue

To begin, create a new test class ServiceBusSenderTests.cs and inject the Test Fixture:

public sealed class ServiceBusSenderTests
    : IClassFixture<ServiceBusFixture>,
    IAsyncLifetime
{    
    private readonly ServiceBusAdministrationClient adminClient;
    private readonly IConfiguration config;
    private readonly ServiceBusReceiver receiver;
    private readonly string testQueue;
    private QueueProperties queueProps = default!;    

    public ServiceBusSenderTests(ServiceBusFixture fixture)
    {
        ArgumentNullException.ThrowIfNull(fixture);

        adminClient = fixture.AdminClient
            ?? throw new ArgumentNullException(nameof(fixture.AdminClient));
        config = fixture.Config
            ?? throw new ArgumentNullException(nameof(fixture.Config));

        testQueue = $"Test-{Guid.NewGuid()}";
        receiver = fixture.GetReceiver(testQueue);
    }
}

We generate a test queue name with the random UUID prefixed by 'Test' to identify it in case it isn't cleaned up. Then, we pull a new ServiceBusReceiver from our fixture pointed toward that test queue.

Then, we'll use the implementation of IAsyncLifetimeto get the existingQueuePropertiesduringInitializeAsync, this is an important step. We could just create any random new queue, however, for our test to really ensure our shared queue is setup correctly, we need to duplicate it with its existing properties, only renamed.

public async Task InitializeAsync()
{
    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
    var queueName = config["Queue"];
    var propsResponse = await adminClient.GetQueueAsync(queueName, cts.Token);
    queueProps = propsResponse.Value;
}

Then in our DisposeAsync we'll use the adminClient to clean up the test queue if it has been created.

public async Task DisposeAsync()
{
    if (adminClient is null)
        return;

    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));        
    var queueExists = await adminClient.QueueExistsAsync(queue, cts.Token);
    if (queueExists)
        await adminClient.DeleteQueueAsync(queue, cts.Token);
}

Next, we have a helper method to build the QueueSender. This sets up a Service Collection to register all dependencies and then create our QueueSender based on the config instance we will point to our test queue.

private QueueSender BuildQueueSender(IConfiguration config)
    => new ServiceCollection()
    .AddSingleton(config)
    .AddSingleton<QueueSender>()
    .AddServiceBusForQueueSender()
    .AddLogging()
    .BuildServiceProvider()
    .GetRequiredService<QueueSender>();

Finally, in our test implementation we execute the following steps:

  1. Update the config and create the test queue

  2. Create and start the QueueSender

  3. Receive and assert against any messages

  4. Stop the QueueSender

We do this all within a 10 second timeout that ensures our test doesn't hang for too long and will move to clean up the test queue eventually.

[Fact]
public async Task Can_Send_And_Receive_Message()
{
    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
    config["Queue"] = testQueue;        
    await adminClient.CreateQueueAsync(new CreateQueueOptions(queueProps)
    {
        Name = testQueue
    }, cts.Token);

    var sender = BuildQueueSender(config);
    await sender.StartAsync(cts.Token);

    var msg = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(2), cts.Token);

    Assert.NotNull(msg);
    var expecetd = "Hello World";
    Assert.Equal(expecetd, $"{msg.Body}");

    await sender.StopAsync(cts.Token);
}

Wrap Up & Repo

This was solid introduction to the ServiceBusAdministrationClient, many users of Azure Service Bus never get introduced to this very powerful component. Here, we've leveraged these capabilities to integration test our connection with Service Bus, the existence of a Queue, and ultimately to verify the functionality of a background service. With this approach, multiple engineers and CI/CD pipelines can execute these tests against our shared Service Bus resource without conflicting and causing flaky test results.

Of course, all code is available on GitHub ⇒ ConcurrentFlows.AzureBusSeries

If there's anything specific you'd like covered regarding Service Bus, please drop an ask in the comments!