Tag Archives: RabbitMQ

The Dangers of async void Event Handlers

When using async/await, you’ll want to use async Task methods most of the time, and use async void methods only for event handlers (see “Async/Await – Best Practices in Asynchronous Programming“, by Stephen Cleary, MSDN Magazine, March 2013).

This conventional wisdom works great if you’re building something like a WPF (GUI) application, and event handlers are invoked occasionally as a result of user interaction (e.g. user presses a button, and an event fires). However, there is another class of event handlers that are invoked as part of a dispatcher loop in a third-party library. async void can be pretty dangerous in these cases.

async void Event Handlers in RabbitMQ

Let’s take the example of RabbitMQ. We’ll set up a basic publisher and consumer. A fundamental property of message queues is that messages are delivered in order, and that’s what we expect to happen.

First, install the RabbitMQ Client library via NuGet:

Install-Package RabbitMQ.Client

Then, we can set up a basic publisher and consumer:

        static void Main(string[] args)
        {
            Console.Title = "async RabbitMQ";

            var factory = new ConnectionFactory();

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                const string queueName = "testqueue";

                // create queue if not already there

                channel.QueueDeclare(queueName, true, false, false, null);

                // publish

                var props = channel.CreateBasicProperties();
                    
                for (int i = 0; i < 5; i++)
                {
                    var msgBytes = Encoding.UTF8.GetBytes("Message " + i);
                    channel.BasicPublish("", queueName, props, msgBytes);
                }

                // set up consumer

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += Consumer_Received;
                channel.BasicConsume("testqueue", true, consumer);

                Console.ReadLine();
            }
        }

Our consumer will call the Consumer_Received event handler whenever a message is received. This is the first version of the event handler:

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var content = Encoding.UTF8.GetString(body);

            Console.WriteLine("Began handling " + content);

            Thread.Sleep(1000);

            Console.WriteLine("Finished handling " + content);
        }

If we run this now, the messages are processed one at a time and in order just a we expect:

rabbitmq-async-sync

Now, let’s change the event handler to an asynchronous one:

        private static async void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var content = Encoding.UTF8.GetString(body);

            Console.WriteLine("Began handling " + content);

            await Task.Delay(1000);

            Console.WriteLine("Finished handling " + content);
        }

If we run this now…

rabbitmq-async-async

…we see that our concurrency and ordering guarantees have just gone out the window.

Understanding async void, Revisited

In my recent article, “In-Depth Async in Akka .NET: Why We Need PipeTo()“, I explained what happens when you call async void methods. Let’s recap that.

Say we have this program. We’re calling an async void method in a loop.

        static void Main(string[] args)
        {
            Console.Title = "async void";

            for (int i = 0; i < 5; i++)
                RunJob("Job " + i);

            Console.ReadLine();
        }

        static async void RunJob(string str)
        {
            Console.WriteLine("Start " + str);

            await Task.Delay(1000);

            Console.WriteLine("End " + str);
        }

When you call an async void method, it’s done in a fire-and-forget manner. The caller has no way of knowing whether or when the operation ended, so it just resumes execution immediately, rather than waiting for the async void method to finish. So you end up with parallel and interleaved execution such as this:

rabbitmq-async-asyncvoid

If we change RunJob() to be synchronous instead…

        static void RunJob(string str)
        {
            Console.WriteLine("Start " + str);

            Thread.Sleep(1000);

            Console.WriteLine("End " + str);
        }

…you’ll see that everything happens one at a time and in order:

rabbitmq-async-sync2

So you have to be really careful when using async void:

  1. There is no way for the caller to await completion of the method.
  2. As a result of this, async void calls are fire-and-forget.
  3. Thus it follows that async void methods (including event handlers) will execute in parallel if called in a loop.
  4. Exceptions can cause the application to crash (see the aforementioned article by Stephen Cleary for more on this).

Fixing async void event handlers

Despite these problems, if you want to await in your event handler, you have to make it async void. To prevent parallel and interleaved execution, you have to lock. However, you can’t await in a lock block, so you need to use a different synchronisation mechanism such as a semaphore.

My own Dandago.Utilities provides a ScopedAsyncLock that allows you to neatly wrap the critical section in a using block:

        private static ScopedAsyncLockFactory factory = new ScopedAsyncLockFactory();

        private static async void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            using (var scopedLock = await factory.CreateLockAsync())
            {
                var body = e.Body;
                var content = Encoding.UTF8.GetString(body);

                Console.WriteLine("Began handling " + content);

                await Task.Delay(1000);

                Console.WriteLine("Finished handling " + content);
            }
        }

Like this, messages are consumed one at a time, and in order:

rabbitmq-async-scopedasynclock

ScopedAsyncLockFactory uses a semaphore underneath, so don’t forget to dispose it!

RabbitMQ String Headers Received As Byte Arrays

I ran into a strange issue today with RabbitMQ. When adding custom headers, I found that things like integers would get through to the other end just fine. But when it came to strings, they ended up as byte arrays in the receiving end.

To illustrate this issue, I’ve written a very simple¬†example. The publisher, below, sets two custom headers: a string and an integer:

        static void Main(string[] args)
        {
            Console.Title = "Publisher";

            var factory = new ConnectionFactory();

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    // prepare payload and headers

                    var body = Encoding.UTF8.GetBytes("Hello");
                    var props = channel.CreateBasicProperties();
                    props.Headers = new Dictionary<string, object>();
                    props.Headers["Name"] = "Bob";
                    props.Headers["Age"] = 21;

                    // set up queue and exchange

                    channel.QueueDeclare("testqueue", true, false, false, null);
                    channel.ExchangeDeclare("testexchange", "direct");
                    channel.QueueBind("testqueue", "testexchange", "");

                    // publish message

                    channel.BasicPublish("testexchange", "", props, body);

                    Console.ReadLine();
                }
            }
        }

As you can see, this is set correctly in the Headers collection:

rabbitmq-quirkystring-publish-debug

Now that we’ve published a message, we can consume it using the following code (practically the same as that in “Getting Started with RabbitMQ with .NET“, except that it also writes out the two custom headers):

        static void Main(string[] args)
        {
            Console.Title = "Consumer";

            var factory = new ConnectionFactory();

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("testqueue", true, false, false, null);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += Consumer_Received;
                    channel.BasicConsume("testqueue", true, consumer);

                    Console.ReadLine();
                }
            }
        }

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var content = Encoding.UTF8.GetString(body);
            var name = e.BasicProperties.Headers["Name"];
            var age = e.BasicProperties.Headers["Age"];

            Console.WriteLine("{0} {1} {2}", name, age, content);
        }

The output, however, is not quite what one would expect:

rabbitmq-quirkystring-bytearray-output

In fact, what we received in the consumer is not a string but a byte array, even if the bytes correspond to what we actually sent:

rabbitmq-quirkystring-consume-debug

The integer header, however, did not have this problem.

A quick search showed that I’m not the first person to encounter this, as someone pointed out this odd behaviour back in 2012, and it appears there’s a similar issue in the Python implementation.

All I can suggest based on these two links is: if you have a header which you know is a string, just do the byte-to-string conversion yourself:

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var content = Encoding.UTF8.GetString(body);
            var nameBytes = (byte[]) e.BasicProperties.Headers["Name"];
            var name = Encoding.UTF8.GetString(nameBytes);
            var age = e.BasicProperties.Headers["Age"];

            Console.WriteLine("{0} {1} {2}", name, age, content);
        }

As you would expect, this sorts out the problem:

rabbitmq-quirkystring-string-output

Resilient Connections with RabbitMQ .NET Client

Update 21st November 2015: This article explains the logic behind resilient connections, but was not yet a complete solution at the time of writing. Please see updates at the end of the article for the suggested action to take.

My recent article “Getting Started with RabbitMQ with .NET” showed a simple example of message consumption using the RabbitMQ .NET Client. We had this code:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("testqueue", true, false, false, null);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += Consumer_Received;
                    channel.BasicConsume("testqueue", true, consumer);

                    Console.ReadLine();
                }
            }
        }

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var content = Encoding.UTF8.GetString(body);
            Console.WriteLine(content);
        }

That’s straightforward enough. But what happens if the connection breaks? Let’s find out.

To make sure everything’s running fine, we can repeat the test done in the original article: run the program, send a message from the RabbitMQ Management UI, and make sure it is received:

rabbitmq-resilience-first-test

That’s all well and good. Now, let’s restart the RabbitMQ service – that will break the connection. To do this, From Start -> Run, run services.msc to open up the Services running on the system, and restart the one called RabbitMQ:

rabbitmq-resilience-restart-rabbitmq

If you try publishing another message to the queue, you’ll find that the consumer won’t receive it. In fact, if you check the queue from the management UI, you’ll find that the message is still there. If you restart the client program (starting a new connection), the new message will be able to reach the client.

Although restarting services to re-establish connections may be common practice in some companies (*cough *cough*), it’s not something we want to encourage, so we need a mechanism that allows the connection to be re-established once the service is available again.

This StackOverflow answer pretty much covers the things we need to take care of to make our connections resilient.

First, we can add a heartbeat:

            var factory = new ConnectionFactory() { HostName = "localhost",
                RequestedHeartbeat = 30 };

This doesn’t solve the scenario presented above, but handles an edge case that occurs if a message is taken by a consumer but the connection dies before it gets acknowledged:

“Turning the heartbeat on also makes the server check to see if the connection is still up, which can be very important. If a connection goes bad after a message has been picked up by the subscriber but before it’s been acknowledged, the server just assumes that the client is taking a long time, and the message gets “stuck” on the dead connection until it gets closed. With the heartbeat turned on, the server will recognize when the connection goes bad and close it, putting the message back in the queue so another subscriber can handle it. Without the heartbeat, I’ve had to go in manually and close the connection in the Rabbit management UI so that the stuck message can get passed to a subscriber.”

Since this scenario is a bit tricky to reproduce, I’ll take the author’s word for it. But now, on to our scenario. This part of the answer gives us something to think about:

“Finally, you will have to handle what your consumer does when trying to consume messages from a closed connection. Unfortunately, each different way of consuming messages from a queue in the Rabbit client seems to react differently. QueueingBasicConsumer throws EndOfStreamException if you call QueueingBasicConsumer.Queue.Dequeue on a closed connection. EventingBasicConsumer does nothing, since it’s just waiting for a message.”

In our case we’re using EventingBasicConsumer, and the test we performed earlier showed that in case of disconnection, messages are no longer received, but no exceptions are thrown either. In this case, we need a way to detect when a connection breaks. Fortunately, there’s an event that fires when that happens:

                connection.ConnectionShutdown += Connection_ConnectionShutdown;

We’ll need to refactor our code so that we can recreate the connection, channel and consumer when reconnecting. Let’s move our variables outside of the Main() method:

        private static ConnectionFactory factory;
        private static IConnection connection;
        private static IModel channel;
        private static EventingBasicConsumer consumer;

We’ll split up our Main() method to make it easier to manage:

        static void Main(string[] args)
        {
            factory = new ConnectionFactory() { HostName = "localhost",
                RequestedHeartbeat = 30 };

            Connect();

            Console.ReadLine();

            Cleanup();
        }

        static void Connect()
        {
            connection = factory.CreateConnection();
            connection.ConnectionShutdown += Connection_ConnectionShutdown;

            channel = connection.CreateModel();
            channel.QueueDeclare("testqueue", true, false, false, null);

            consumer = new EventingBasicConsumer(channel);
            consumer.Received += Consumer_Received;
            channel.BasicConsume("testqueue", true, consumer);
        }

        static void Cleanup()
        {
            try
            {
                if (channel != null && channel.IsOpen)
                {
                    channel.Close();
                    channel = null;
                }

                if (connection != null && connection.IsOpen)
                {
                    connection.Close();
                    connection = null;
                }
            }
            catch(IOException ex)
            {
                // Close() may throw an IOException if connection
                // dies - but that's ok (handled by reconnect)
            }
        }

What remains is the implementation of Connection_ConnectionShutdown.

A very rudimentary implementation of reconnection could be as follows:

        private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            Console.WriteLine("Connection broke!");

            Cleanup();

            while (true)
            {
                try
                {
                    Connect();

                    Console.WriteLine("Reconnected!");
                    break;
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Reconnect failed!");
                    Thread.Sleep(3000);
                }
            }
        }

If you test this, you’ll see that it actually works: it reconnects, and messages you send after reconnection are received:

rabbitmq-resilience-reconnect-works

This sorts out what we set out to achieve, but we’re not quite done yet.

For starters, Thread.Sleep() sucks. We can make the reconnect code more efficient by using something like a ManualResetEventSlim. A ManualResetEventSlim is like a semaphore, but only has on and off (Set and Reset) states. Although it is mostly useful in multithreading scenarios, we can use it instead of Thread.Sleep() to periodically reconnect. At face value, the following code should behave the same way as the code above:

        private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            Console.WriteLine("Connection broke!");

            Cleanup();

            var mres = new ManualResetEventSlim(false); // state is initially false

            while (!mres.Wait(3000)) // loop until state is true, checking every 3s
            {
                try
                {
                    Connect();

                    Console.WriteLine("Reconnected!");
                    mres.Set(); // state set to true - breaks out of loop
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Reconnect failed!");
                }
            }
        }

That’s nice. But we also have to handle the case where service wasn’t running from the start, and the initial connection attempt failed (the current code will explode in this scenario).

No problem. All we have to do is call the same reconnection logic when connecting the first time. Since we don’t want to call the event handler directly, let’s move the logic into its own method. Note that I’ve also changed the output strings from “Reconnect” to “Connect” so that they apply to all connection attempts.

        private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            Console.WriteLine("Connection broke!");

            Reconnect();
        }

        private static void Reconnect()
        {
            Cleanup();

            var mres = new ManualResetEventSlim(false); // state is initially false

            while (!mres.Wait(3000)) // loop until state is true, checking every 3s
            {
                try
                {
                    Connect();

                    Console.WriteLine("Connected!");
                    mres.Set(); // state set to true - breaks out of loop
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Connect failed!");
                }
            }
        }

Now, Main() can just call Reconnect():

        static void Main(string[] args)
        {
            factory = new ConnectionFactory() { HostName = "localhost",
                RequestedHeartbeat = 30 };

            Reconnect();

            Console.ReadLine();

            Cleanup();
        }

And testing that out, we find that it works nicely as well:

rabbitmq-resilience-connect-works

But you’ll also find that we broke the case where the user presses ENTER to exit the program gracefully, as the cleanup logic will cause the ConnectionShutdown to be fired, triggering the reconnect logic. We can sort this out by disconnecting the event handler before the final cleanup:

        static void Main(string[] args)
        {
            factory = new ConnectionFactory() { HostName = "localhost",
                RequestedHeartbeat = 30 };

            Reconnect();

            Console.ReadLine();

            connection.ConnectionShutdown -= Connection_ConnectionShutdown;
            Cleanup();
        }

All this should work pretty nicely. Additionally, since we’ve neatly separated setup from cleanup, it’s really easy to put this code into a Windows service and allocate the appropriate parts into Start() and Stop() methods.

You can find the source code for this article here at the Gigi Labs BitBucket repository.

Update 12th November 2015: In the .NET/C# API Guide, you’ll find that there’s an AutomaticRecoveryEnabled property on the ConnectionFactory which takes care of reconnecting if the connection goes down. However, it does not deal with the case when the initial connection fails (in which case an exception is thrown).

Update 21st November 2015: Apparently handling reconnect as shown here causes problems with acknowledgements when the connection breaks. The best approach is a combination of iterative reconnect for the first connection, and AutomaticRecoveryEnabled for subsequent disconnects.

Connecting to RabbitMQ Across Machines with .NET Client

My earlier article “Getting Started with RabbitMQ with .NET” showed how it’s easy to use the RabbitMQ .NET Client to connect to an instance of RabbitMQ running locally. In fact, omitting the code that declares a queue and consumes messages, we are essentially left with this:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };

            using (var connection = factory.CreateConnection())
            {
                Console.WriteLine("Connected!");

                Console.ReadLine();
            }
        }

If we replace localhost with the hostname of a machine which actually does have a RabbitMQ instance running, however, we’re in for a surprise:

rabbitmq-connect-across-pcs-error

This StackOverflow answer suggests that we may need a username and password when setting up the connection. We can add these by changing our factory declaration as follows:

            var factory = new ConnectionFactory()
            {
                HostName = "MyOtherPc",
                UserName = "joe",
                Password = "joe"
            };

Connections using the default guest user don’t seem to work across machines, so we’re going to have to create a user account.

From the RabbitMQ Management UI, add a new user, giving him all tags available (administrator,monitoring,policymaker,management):

rabbitmq-new-user

That’s not enough to connect using our new user, though. You can see why if you click on the user:

rabbitmq-permission-virtual-directory

You can easily rectify the problem by clicking on that Set permission button to give the new user access to the root (/) virtual host. Note that this can also be done from the Virtual Hosts section… but doing it from the user page is easier.

rabbitmq-connect-across-machines-success

It works now. Yay.

Easy RabbitMQ Messaging with EasyNetQ

This article is about EasyNetQ, a library for using RabbitMQ with .NET. At the time of writing this article, EasyNetQ is still prerelease software. Despite that, several people have been using it in production environments for a while. If you use it in production, it’s at your own risk.

Last August I wrote “Getting Started with RabbitMQ with .NET“. This illustrated, among other things, how you can receive messages from a RabbitMQ queue using the official RabbitMQ .NET client library.

While that’s a perfectly fine thing to do, it’s often preferable to use a higher-level library that takes care of the little details and allows you to focus on the actual messaging in a structured way. Well, that’s why EasyNetQ exists. With EasyNetQ, you don’t have to worry about stuff like serialization, messaging patterns, connection reliability, etc. In the rest of this article, we’ll see how easy it is to send and receive messages with EasyNetQ.

The first thing we need to do, of course, is to install the EasyNetQ NuGet package:

easynetq-install-package

Then, let’s create a class that represents the messages we will be sending. In this example, we’ll pretend we’re sending the position of a player in a game:

    public class PlayerPosition
    {
        public int X { get; set; }
        public int Y { get; set; }
    }

Once we have that, we can subscribe to messages arriving in a queue. This is done like this:

            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                bus.Subscribe<PlayerPosition>("MyGame", playerPosition =>
                    Console.WriteLine($"{playerPosition.X}, {playerPosition.Y}"));

                Console.ReadLine();
            }

So you declare your queue using RabbitHutch.CreateBus(). Bus is just a queue. So what’s a RabbitHutch? Well, what can I say? It’s a rabbit cage.

Facepalmorangflipped

Note how we are subscribing specifically to receive a message of type PlayerPosition. We are using a subscriptionId of “MyGame”, and we are also passing in a handler that specifies what to do with the received message. If the syntax looks a little unfamiliar, that’s because I’m using string interpolation, a new C# language feature introduced in C# 6.0.

When we run this, we can see that a queue is created, with the name built from the message class name and the subscriptionId:

easynetq-newqueue

Now in order to understand what’s going through the queue, let’s stop the subscriber for the time being.

We can send a message on the queue as follows:

            using (var bus = RabbitHutch.CreateBus("host=localhost"))
            {
                var playerPosition = new PlayerPosition() { X = 1, Y = 2 };
                bus.Publish<PlayerPosition>(playerPosition);

                Console.ReadLine();
            }

Note how we’re sending a strongly typed object (as opposed to a loose string), which we supply as the generic parameter. You can of course leave out the generic parameter, but I am including it above for clarity.

We can now check out the message in the queue from the “Get messages” section in the RabbitMQ management web interface:

easynetq-message-example

As you can see, the data we sent is serialized into JSON, and it is also accompanied by type information that helps EasyNetQ to deserialize it at the receiving end.

In fact, we can now run our subscriber again, and see that it consumes the message:

easynetq-message-received

So you can see how EasyNetQ makes it really easy to focus on the messaging and leave the details to it. Naturally, there’s a lot more you can do with EasyNetQ than this introductory article illustrates. Check out the EasyNetQ documentation for more info.

Getting Started with RabbitMQ with .NET

This article contains some instructions to make it easy for people to jump into the wonderful world of message queues with RabbitMQ. Its purpose is not to provide any in-depth explanations, as I’m pretty new to the topic myself. ūüôā

RabbitMQ is built on Erlang, so you’ll need to install that first:

rabbitmq-download-erlang

You can then download and install RabbitMQ itself:

rabbitmq-download-rabbitmq

Next, you should install the management plugin. This provides a web UI allowing you to manage your queue and perform basic testing (e.g. publish messages).

To do this, navigate to the folder where you installed RabbitMQ, and go into the sbin folder. From here, enter the following command:

rabbitmq-plugins enable rabbitmq_management

Once that is installed, you can access the administrative web UI by going to http://localhost:15672/ with username guest and password guest. Go to the Queues tab and create a new queue:

rabbitmq-add-queue

You’ll now see the new queue listed under “All queues”. Click on the name to select it. This will allow you to view information about the queue (including statistics, such as the number of messages in the queue) and perform a number of operations (including publishing messages).

But before we do that, let’s create a simple client in .NET that can subscribe to the queue and consume any messages we publish. Some basic code is available in this tutorial, so we can just install the RabbitMQ.Client NuGet package and then write some code based on it:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("testqueue", true, false, false, null);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += Consumer_Received;
                    channel.BasicConsume("testqueue", true, consumer);

                    Console.ReadLine();
                }
            }
        }

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var content = Encoding.UTF8.GetString(body);
            Console.WriteLine(content);
        }

We can now run this client, and leave it waiting for messages.

To test it, let’s publish a message from the RabbitMQ management web UI:

rabbitmq-publish-message

You can see from the client that the message has been received:

rabbitmq-consume-message

So that shows in a very basic way how you can setup RabbitMQ on Windows, publish messages from the management web UI, and consume messages by writing a simple .NET client.

Update 2nd March 2016: If the management plugin fails to install with something like “unable to contact node”, here are a few things you can try: