Tag Archives: RabbitMQ

Reading RabbitMQ Settings Using .NET Core Configuration

The .NET Core Configuration system is extremely powerful and flexible. One of the features that I use the most is its capability to bind structured settings from a source (e.g. JSON file) to a C# object.

A very good example of this is obtaining RabbitMQ settings so that you can populate the ConnectionFactory. In the past, I’ve created a DTO (class) for this, and a parser that could populate this class based on a connection string format that I invented on the spot out of necessity. The good news is that you don’t have to do this any more. .NET Core configuration allows you to bind your config to an object, even if it’s coming from a third party library. Let’s see how.

Typical RabbitMQ Configuration

First, in order to use RabbitMQ, we need to install the RabbitMQ Client NuGet package.

Install-Package RabbitMQ.Client

Next, we’ll typically create a connection by means of the ConnectionFactory. We’ll need to populate the necessary fields, whether directly or by reading them from config. Technically, most of the settings below are not necessary because defaults are assumed if not provided, but we’ll include them anyway as we’re not assuming everyone is connecting to RabbitMQ on localhost.

            var connectionFactory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest",
                VirtualHost = "/",
                AutomaticRecoveryEnabled = true,
                RequestedHeartbeat = 30
            };

If we use .NET Core configuration, we don’t even need to do this any more.

Connection Settings From JSON File

Let’s start by adding a new text file to the project called appsettings.json. From its properties, change it to copy to the output directory on build (Copy always and Copy if newer are both fine). In the file, we’ll add the JSON equivalent of what we have in ConnectionFactory above:

{
  "RabbitMqConnection": {
    "HostName": "localhost",
    "Username": "guest",
    "Password": "guest",
    "VirtualHost": "/",
    "AutomaticRecoveryEnabled": true,
    "RequestedHeartbeat": 30
  }
}

Now, we need a way to read this JSON file and bind it to a ConnectionFactory object. To do that, we need the following NuGet packages:

Install-Package Microsoft.Extensions.Configuration
Install-Package Microsoft.Extensions.Configuration.Json
Install-Package Microsoft.Extensions.Configuration.Binder

The .NET Core configuration system is split into multiple packages, so you can bring in only what you actually need. The first package is the heart of the framework, and you don’t need to install it directly because the second and third packages will both bring it in as a dependency when you install them. As for the Json and Binder packages, we’ll see what they do in a minute.

.NET Core configuration is loaded by means of a ConfigurationBuilder object. In our case, we’ll have:

            var config = new ConfigurationBuilder()
                .AddJsonFile("appsettings.json")
                .Build();

The AddJsonFile() extension method is provided by the Json package (the second one we installed earlier). The result of this is an object which implements IConfigurationRoot, and we can use this to read our settings.

Next, we’ll prepare an empty ConnectionFactory object that the binder will populate from the configuration in the next step.

var connectionFactory = new ConnectionFactory();

Finally, we can bind the entire “RabbitMqConnection” section of the appsettings.json file to our ConnectionFactory object, using the Bind() method (provided via the Binder package we installed earlier):

config.GetSection("RabbitMqConnection").Bind(connectionFactory);

If the key-value pairs in the JSON section match properties on the connectionFactory object, they will be set. You’ll know it worked because RequestedHeartbeat has a default value of 60, but it will be overridden by the value of 30 from appsettings.json.

Testing Connectivity

Now that you are populating the ConnectionFactory, you can connect in the same way as you used to before. This should suffice, as you’ll get an exception if your connection settings are incorrect:

            using (var conn = connectionFactory.CreateConnection())
            {
                Console.WriteLine("Connected!");

                // ...                

                Console.ReadLine();
            }

But if you want to make damn sure that you can actually interact with RabbitMQ, you can write a minimal consumer, and then send it messages via the Management Plugin’s Web UI:

            using (var conn = connectionFactory.CreateConnection())
            using (var channel = conn.CreateModel())
            {
                Console.WriteLine("Connected!");

                const string queueName = "madrid";
                channel.QueueDeclare(queueName, true, false, false, null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (s, a) => Console.WriteLine("Message received!");
                channel.BasicConsume(queueName, true, consumer);

                Console.ReadLine();
            }

Wondering about the name of the queue? It’s because I found lots of them in Madrid. No kidding:

Summary

The point here was to show you how you can read settings from a section of a JSON file and have it directly deserialized into an object, using the binder feature of .NET Core configuration. The example here is specific to RabbitMQ, but you can use the same approach with any class you like, as long as the properties have public setters.

Also, remember that .NET Core configuration actually targets .NET Standard. That means you can use it not only with .NET Core apps, but also in the full .NET Framework, and any other compatible runtimes.

Asynchronous RabbitMQ Consumers in .NET

It’s quite common to do some sort of I/O operation (e.g. REST call) whenever a message is consumed by a RabbitMQ client. This should be done asynchronously, but it’s not as simple as changing the event handling code to async void.

In “The Dangers of async void Event Handlers“, I explained how making an event handler async void will mess up the message order, because the dispatcher loop will not wait for a message to be fully processed before calling the handler on the next one.

While that article provided a workaround that is great to use with older versions of the RabbitMQ Client library, it turns out that there is an AsyncEventingBasicConsumer as from RabbitMQ.Client 5.0.0-pre3 which works great for asynchronous message consumption.

AsyncEventingBasicConsumer Example

First, we need to make sure that the RabbitMQ client library is installed.

Install-Package RabbitMQ.Client

Then, we can set up a publisher and consumer to show how to use the AsyncEventingBasicConsumer. Since this is just a demonstration, we can have both in the same process:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { DispatchConsumersAsync = true };
            const string queueName = "myqueue";

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queueName, true, false, false, null);

                // consumer

                var consumer = new AsyncEventingBasicConsumer(channel);
                consumer.Received += Consumer_Received;
                channel.BasicConsume(queueName, true, consumer);

                // publisher

                var props = channel.CreateBasicProperties();
                int i = 0;

                while (true)
                {
                    var messageBody = Encoding.UTF8.GetBytes($"Message {++i}");
                    channel.BasicPublish("", queueName, props, messageBody);
                    Thread.Sleep(50);
                }
            }
        }

There is nothing really special about the above code except that we’re using AsyncEventingBasicConsumer instead of EventingBasicConsumer, and that the ConnectionFactory is now being set up with a suspicious-looking DispatchConsumersAsync property set to true. The ConnectionFactory is using defaults, so it will connect to localhost using the guest account.

The message handler is expected to return Task, and this makes it very easy to use proper asynchronous code:

        private static async Task Consumer_Received(object sender, BasicDeliverEventArgs @event)
        {
            var message = Encoding.UTF8.GetString(@event.Body);

            Console.WriteLine($"Begin processing {message}");

            await Task.Delay(250);

            Console.WriteLine($"End processing {message}");
        }

The messages are indeed processed in order:

How to Mess This Up

Remember that DispatchConsumersAsync property? I haven’t really found any documentation explaining what it actually does, but we can venture a guess after a couple of experiments.

First, let’s keep that property, but use a synchronous EventingBasicConsumer instead (which also means changing the event handler to have a void return type). When we run this, we get an error:

It says “In the async mode you have to use an async consumer”. Which I suppose is fair enough.

So now, let’s go back to using an AsyncEventingBasicConsumer, but leave out the DispatchConsumersAsync property:

var factory = new ConnectionFactory();

This time, you’ll see that the the event handler is not firing (nothing is being written to the console). The messages are indeed being published, and the queue is remaining at zero messages, so they are being consumed (you’ll see them accumulate if you disable the consumer).

This is actually quite dangerous, yet there is no error like the one we saw earlier. It means that if a developer forgets to set that DispatchConsumersAsync property, then all messages are lost. It’s also quite strange that the choice of how to dispatch messages to the consumer (i.e. sync or async) is a property of the connection rather than the consumer, although presumably it would be a result of some internal plumbing in the RabbitMQ Client library.

Summary

AsyncEventingBasicConsumer is great for having pure asynchronous RabbitMQ consumers, but don’t forget that DispatchConsumersAsync property.

It’s only available since RabbitMQ.Client 5.0.0-pre3, so if you’re on an older version, use the workaround described in “The Dangers of async void Event Handlers” instead.

RabbitMQ: Who Creates the Queues and Exchanges?

Messaging is a fundamental part of any distributed architecture. It allows a publisher to send a message to any number of consumers, without having to know anything about them. This is great for truly asynchronous and decoupled communications.

The above diagram shows a very basic but typical setup you would see when using RabbitMQ. A publisher publishes a message onto an exchange. The exchange handles the logic of routing the message to the queues that are bound to it. For instance, if it is a fanout exchange, then a copy of the same message would be duplicated and placed on each queue. A consumer can then read messages from a queue and process them.

An important assumption for this setup to work is that when publishers and consumers are run, all this RabbitMQ infrastructure (i.e. the queues, exchanges and bindings) must already exist. A publisher would not be able to publish to an exchange that does not exist, and neither could a consumer take messages from an inexistent queue.

Thus, it is not unreasonable to have publishers and/or consumers create the queues, exchanges and bindings they need before beginning to send and receive messages. Let’s take a look at how this may be done, and the implications of each approach.

1. Split Responsibility

To have publishers and consumers fully decoupled from each other, ideally the publisher should know only about the exchange (not the queues), and the consumers should know only about the queues (not the exchange). The bindings are the glue between the exchange and the queues.

One possible approach could be to have the publisher handle the creation of the exchange, and consumers create the queues they need and bind them to the exchange. This has the advantage of decoupling: as new queues are needed, the consumers that need them will simply create and bind them as needed, without the publisher needing to know anything about them. It is not fully decoupled though, as the consumers must know the exchange in order to bind to it.

On the other hand, there is a very real danger of losing messages. If the publisher is deployed before any consumers are running, then the exchange will have no bindings, and any messages published to it will be lost. Whether this is acceptable is something application-dependent.

2. Publisher Creates Everything

The publisher could be configured to create all the necessary infrastructure (exchanges, queues and bindings) as soon as it runs. This has the advantage that no messages will be lost (because queues will be bound to the exchange without needing any consumers to run first).

However, this means that the publisher must know about all the queues that will be bound to the exchange, which is not a very decoupled approach. Every time a new queue is added, the publisher must be reconfigured and redeployed to create it and bind it.

3. Consumer Creates Everything

The opposite approach is to have consumers create exchanges, queues and bindings that they need, as soon as they run. Like the previous approach, this introduces coupling, because consumers must know the exchange that their queues are binding to. Any change in the exchange (renaming, for instance) means that all consumers must be reconfigured and redeployed. This complexity may be prohibitive when a large number of queues and consumers are present.

4. Neither Creates Anything

A completely different option is for neither the publisher nor the consumer to create any of the required infrastructure. Instead, it is created beforehand using either the user interface of the Management Plugin or the Management CLI. This has several advantages:

  • Publishers and consumers can be truly decoupled. Publishers know only about exchanges, and consumers know only about queues.
  • This can easily be scripted and automated as part of a deployment pipeline.
  • Any changes (e.g. new queues) can be added without needing to touch any of the existing, already-deployed publishers and consumers.

Summary

Asynchronous messaging is a great way to decouple services in a distributed architecture, but to keep them decoupled, a valid strategy for maintaining the underlying messaging constructs (in the case of RabbitMQ, these would be the queues, exchanges and bindings) is necessary.

While publisher and consumer services may themselves take care of creating what they need, there could be a heavy price to pay in terms of initial message loss, coupling, and operational maintenance (in terms of configuration and deployment).

The best approach is probably to handle the messaging system configuration where it belongs: scripting it outside of the application. This ensures that services remain decoupled, and that the queueing system can change dynamically as needed without having to impact a lot of existing services.

The Dangers of async void Event Handlers

When using async/await, you’ll want to use async Task methods most of the time, and use async void methods only for event handlers (see “Async/Await – Best Practices in Asynchronous Programming“, by Stephen Cleary, MSDN Magazine, March 2013).

This conventional wisdom works great if you’re building something like a WPF (GUI) application, and event handlers are invoked occasionally as a result of user interaction (e.g. user presses a button, and an event fires). However, there is another class of event handlers that are invoked as part of a dispatcher loop in a third-party library. async void can be pretty dangerous in these cases.

async void Event Handlers in RabbitMQ

Let’s take the example of RabbitMQ. We’ll set up a basic publisher and consumer. A fundamental property of message queues is that messages are delivered in order, and that’s what we expect to happen.

First, install the RabbitMQ Client library via NuGet:

Install-Package RabbitMQ.Client

Then, we can set up a basic publisher and consumer:

        static void Main(string[] args)
        {
            Console.Title = "async RabbitMQ";

            var factory = new ConnectionFactory();

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                const string queueName = "testqueue";

                // create queue if not already there

                channel.QueueDeclare(queueName, true, false, false, null);

                // publish

                var props = channel.CreateBasicProperties();
                    
                for (int i = 0; i < 5; i++)
                {
                    var msgBytes = Encoding.UTF8.GetBytes("Message " + i);
                    channel.BasicPublish("", queueName, props, msgBytes);
                }

                // set up consumer

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += Consumer_Received;
                channel.BasicConsume("testqueue", true, consumer);

                Console.ReadLine();
            }
        }

Our consumer will call the Consumer_Received event handler whenever a message is received. This is the first version of the event handler:

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var content = Encoding.UTF8.GetString(body);

            Console.WriteLine("Began handling " + content);

            Thread.Sleep(1000);

            Console.WriteLine("Finished handling " + content);
        }

If we run this now, the messages are processed one at a time and in order just as we expect:

rabbitmq-async-sync

Now, let’s change the event handler to an asynchronous one:

        private static async void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var content = Encoding.UTF8.GetString(body);

            Console.WriteLine("Began handling " + content);

            await Task.Delay(1000);

            Console.WriteLine("Finished handling " + content);
        }

If we run this now…

rabbitmq-async-async

…we see that our concurrency and ordering guarantees have just gone out the window.

Understanding async void, Revisited

In my recent article, “In-Depth Async in Akka .NET: Why We Need PipeTo()“, I explained what happens when you call async void methods. Let’s recap that.

Say we have this program. We’re calling an async void method in a loop.

        static void Main(string[] args)
        {
            Console.Title = "async void";

            for (int i = 0; i < 5; i++)
                RunJob("Job " + i);

            Console.ReadLine();
        }

        static async void RunJob(string str)
        {
            Console.WriteLine("Start " + str);

            await Task.Delay(1000);

            Console.WriteLine("End " + str);
        }

When you call an async void method, it’s done in a fire-and-forget manner. The caller has no way of knowing whether or when the operation ended, so it just resumes execution immediately, rather than waiting for the async void method to finish. So you end up with parallel and interleaved execution such as this:

rabbitmq-async-asyncvoid

If we change RunJob() to be synchronous instead…

        static void RunJob(string str)
        {
            Console.WriteLine("Start " + str);

            Thread.Sleep(1000);

            Console.WriteLine("End " + str);
        }

…you’ll see that everything happens one at a time and in order:

rabbitmq-async-sync2

So you have to be really careful when using async void:

  1. There is no way for the caller to await completion of the method.
  2. As a result of this, async void calls are fire-and-forget.
  3. Thus it follows that async void methods (including event handlers) will execute in parallel if called in a loop.
  4. Exceptions can cause the application to crash (see the aforementioned article by Stephen Cleary for more on this).

Fixing async void event handlers

Despite these problems, if you want to await in your event handler, you have to make it async void. To prevent parallel and interleaved execution, you have to lock. However, you can’t await in a lock block, so you need to use a different synchronisation mechanism such as a semaphore.

My own Dandago.Utilities provides a ScopedAsyncLock that allows you to neatly wrap the critical section in a using block:

        private static ScopedAsyncLockFactory factory = new ScopedAsyncLockFactory();

        private static async void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            using (var scopedLock = await factory.CreateLockAsync())
            {
                var body = e.Body;
                var content = Encoding.UTF8.GetString(body);

                Console.WriteLine("Began handling " + content);

                await Task.Delay(1000);

                Console.WriteLine("Finished handling " + content);
            }
        }

Like this, messages are consumed one at a time, and in order:

rabbitmq-async-scopedasynclock

ScopedAsyncLockFactory uses a semaphore underneath, so don’t forget to dispose it!

RabbitMQ String Headers Received As Byte Arrays

I ran into a strange issue today with RabbitMQ. When adding custom headers, I found that things like integers would get through to the other end just fine. But when it came to strings, they ended up as byte arrays in the receiving end.

To illustrate this issue, I’ve written a very simple example. The publisher, below, sets two custom headers: a string and an integer:

        static void Main(string[] args)
        {
            Console.Title = "Publisher";

            var factory = new ConnectionFactory();

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    // prepare payload and headers

                    var body = Encoding.UTF8.GetBytes("Hello");
                    var props = channel.CreateBasicProperties();
                    props.Headers = new Dictionary<string, object>();
                    props.Headers["Name"] = "Bob";
                    props.Headers["Age"] = 21;

                    // set up queue and exchange

                    channel.QueueDeclare("testqueue", true, false, false, null);
                    channel.ExchangeDeclare("testexchange", "direct");
                    channel.QueueBind("testqueue", "testexchange", "");

                    // publish message

                    channel.BasicPublish("testexchange", "", props, body);

                    Console.ReadLine();
                }
            }
        }

As you can see, this is set correctly in the Headers collection:

rabbitmq-quirkystring-publish-debug

Now that we’ve published a message, we can consume it using the following code (practically the same as that in “Getting Started with RabbitMQ with .NET“, except that it also writes out the two custom headers):

        static void Main(string[] args)
        {
            Console.Title = "Consumer";

            var factory = new ConnectionFactory();

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("testqueue", true, false, false, null);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += Consumer_Received;
                    channel.BasicConsume("testqueue", true, consumer);

                    Console.ReadLine();
                }
            }
        }

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var content = Encoding.UTF8.GetString(body);
            var name = e.BasicProperties.Headers["Name"];
            var age = e.BasicProperties.Headers["Age"];

            Console.WriteLine("{0} {1} {2}", name, age, content);
        }

The output, however, is not quite what one would expect:

rabbitmq-quirkystring-bytearray-output

In fact, what we received in the consumer is not a string but a byte array, even if the bytes correspond to what we actually sent:

rabbitmq-quirkystring-consume-debug

The integer header, however, did not have this problem.

A quick search showed that I’m not the first person to encounter this, as someone pointed out this odd behaviour back in 2012, and it appears there’s a similar issue in the Python implementation.

All I can suggest based on these two links is: if you have a header which you know is a string, just do the byte-to-string conversion yourself:

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var content = Encoding.UTF8.GetString(body);
            var nameBytes = (byte[]) e.BasicProperties.Headers["Name"];
            var name = Encoding.UTF8.GetString(nameBytes);
            var age = e.BasicProperties.Headers["Age"];

            Console.WriteLine("{0} {1} {2}", name, age, content);
        }

As you would expect, this sorts out the problem:

rabbitmq-quirkystring-string-output