Resilient Connections with RabbitMQ .NET Client

Update 21st November 2015: This article explains the logic behind resilient connections, but was not yet a complete solution at the time of writing. Please see updates at the end of the article for the suggested action to take.

My recent article “Getting Started with RabbitMQ with .NET” showed a simple example of message consumption using the RabbitMQ .NET Client. We had this code:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("testqueue", true, false, false, null);

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += Consumer_Received;
                    channel.BasicConsume("testqueue", true, consumer);

                    Console.ReadLine();
                }
            }
        }

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var content = Encoding.UTF8.GetString(body);
            Console.WriteLine(content);
        }

That’s straightforward enough. But what happens if the connection breaks? Let’s find out.

To make sure everything’s running fine, we can repeat the test done in the original article: run the program, send a message from the RabbitMQ Management UI, and make sure it is received:

rabbitmq-resilience-first-test

That’s all well and good. Now, let’s restart the RabbitMQ service – that will break the connection. To do this, From Start -> Run, run services.msc to open up the Services running on the system, and restart the one called RabbitMQ:

rabbitmq-resilience-restart-rabbitmq

If you try publishing another message to the queue, you’ll find that the consumer won’t receive it. In fact, if you check the queue from the management UI, you’ll find that the message is still there. If you restart the client program (starting a new connection), the new message will be able to reach the client.

Although restarting services to re-establish connections may be common practice in some companies (*cough *cough*), it’s not something we want to encourage, so we need a mechanism that allows the connection to be re-established once the service is available again.

This StackOverflow answer pretty much covers the things we need to take care of to make our connections resilient.

First, we can add a heartbeat:

            var factory = new ConnectionFactory() { HostName = "localhost",
                RequestedHeartbeat = 30 };

This doesn’t solve the scenario presented above, but handles an edge case that occurs if a message is taken by a consumer but the connection dies before it gets acknowledged:

“Turning the heartbeat on also makes the server check to see if the connection is still up, which can be very important. If a connection goes bad after a message has been picked up by the subscriber but before it’s been acknowledged, the server just assumes that the client is taking a long time, and the message gets “stuck” on the dead connection until it gets closed. With the heartbeat turned on, the server will recognize when the connection goes bad and close it, putting the message back in the queue so another subscriber can handle it. Without the heartbeat, I’ve had to go in manually and close the connection in the Rabbit management UI so that the stuck message can get passed to a subscriber.”

Since this scenario is a bit tricky to reproduce, I’ll take the author’s word for it. But now, on to our scenario. This part of the answer gives us something to think about:

“Finally, you will have to handle what your consumer does when trying to consume messages from a closed connection. Unfortunately, each different way of consuming messages from a queue in the Rabbit client seems to react differently. QueueingBasicConsumer throws EndOfStreamException if you call QueueingBasicConsumer.Queue.Dequeue on a closed connection. EventingBasicConsumer does nothing, since it’s just waiting for a message.”

In our case we’re using EventingBasicConsumer, and the test we performed earlier showed that in case of disconnection, messages are no longer received, but no exceptions are thrown either. In this case, we need a way to detect when a connection breaks. Fortunately, there’s an event that fires when that happens:

                connection.ConnectionShutdown += Connection_ConnectionShutdown;

We’ll need to refactor our code so that we can recreate the connection, channel and consumer when reconnecting. Let’s move our variables outside of the Main() method:

        private static ConnectionFactory factory;
        private static IConnection connection;
        private static IModel channel;
        private static EventingBasicConsumer consumer;

We’ll split up our Main() method to make it easier to manage:

        static void Main(string[] args)
        {
            factory = new ConnectionFactory() { HostName = "localhost",
                RequestedHeartbeat = 30 };

            Connect();

            Console.ReadLine();

            Cleanup();
        }

        static void Connect()
        {
            connection = factory.CreateConnection();
            connection.ConnectionShutdown += Connection_ConnectionShutdown;

            channel = connection.CreateModel();
            channel.QueueDeclare("testqueue", true, false, false, null);

            consumer = new EventingBasicConsumer(channel);
            consumer.Received += Consumer_Received;
            channel.BasicConsume("testqueue", true, consumer);
        }

        static void Cleanup()
        {
            try
            {
                if (channel != null && channel.IsOpen)
                {
                    channel.Close();
                    channel = null;
                }

                if (connection != null && connection.IsOpen)
                {
                    connection.Close();
                    connection = null;
                }
            }
            catch(IOException ex)
            {
                // Close() may throw an IOException if connection
                // dies - but that's ok (handled by reconnect)
            }
        }

What remains is the implementation of Connection_ConnectionShutdown.

A very rudimentary implementation of reconnection could be as follows:

        private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            Console.WriteLine("Connection broke!");

            Cleanup();

            while (true)
            {
                try
                {
                    Connect();

                    Console.WriteLine("Reconnected!");
                    break;
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Reconnect failed!");
                    Thread.Sleep(3000);
                }
            }
        }

If you test this, you’ll see that it actually works: it reconnects, and messages you send after reconnection are received:

rabbitmq-resilience-reconnect-works

This sorts out what we set out to achieve, but we’re not quite done yet.

For starters, Thread.Sleep() sucks. We can make the reconnect code more efficient by using something like a ManualResetEventSlim. A ManualResetEventSlim is like a semaphore, but only has on and off (Set and Reset) states. Although it is mostly useful in multithreading scenarios, we can use it instead of Thread.Sleep() to periodically reconnect. At face value, the following code should behave the same way as the code above:

        private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            Console.WriteLine("Connection broke!");

            Cleanup();

            var mres = new ManualResetEventSlim(false); // state is initially false

            while (!mres.Wait(3000)) // loop until state is true, checking every 3s
            {
                try
                {
                    Connect();

                    Console.WriteLine("Reconnected!");
                    mres.Set(); // state set to true - breaks out of loop
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Reconnect failed!");
                }
            }
        }

That’s nice. But we also have to handle the case where service wasn’t running from the start, and the initial connection attempt failed (the current code will explode in this scenario).

No problem. All we have to do is call the same reconnection logic when connecting the first time. Since we don’t want to call the event handler directly, let’s move the logic into its own method. Note that I’ve also changed the output strings from “Reconnect” to “Connect” so that they apply to all connection attempts.

        private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            Console.WriteLine("Connection broke!");

            Reconnect();
        }

        private static void Reconnect()
        {
            Cleanup();

            var mres = new ManualResetEventSlim(false); // state is initially false

            while (!mres.Wait(3000)) // loop until state is true, checking every 3s
            {
                try
                {
                    Connect();

                    Console.WriteLine("Connected!");
                    mres.Set(); // state set to true - breaks out of loop
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Connect failed!");
                }
            }
        }

Now, Main() can just call Reconnect():

        static void Main(string[] args)
        {
            factory = new ConnectionFactory() { HostName = "localhost",
                RequestedHeartbeat = 30 };

            Reconnect();

            Console.ReadLine();

            Cleanup();
        }

And testing that out, we find that it works nicely as well:

rabbitmq-resilience-connect-works

But you’ll also find that we broke the case where the user presses ENTER to exit the program gracefully, as the cleanup logic will cause the ConnectionShutdown to be fired, triggering the reconnect logic. We can sort this out by disconnecting the event handler before the final cleanup:

        static void Main(string[] args)
        {
            factory = new ConnectionFactory() { HostName = "localhost",
                RequestedHeartbeat = 30 };

            Reconnect();

            Console.ReadLine();

            connection.ConnectionShutdown -= Connection_ConnectionShutdown;
            Cleanup();
        }

All this should work pretty nicely. Additionally, since we’ve neatly separated setup from cleanup, it’s really easy to put this code into a Windows service and allocate the appropriate parts into Start() and Stop() methods.

You can find the source code for this article here at the Gigi Labs BitBucket repository.

Update 12th November 2015: In the .NET/C# API Guide, you’ll find that there’s an AutomaticRecoveryEnabled property on the ConnectionFactory which takes care of reconnecting if the connection goes down. However, it does not deal with the case when the initial connection fails (in which case an exception is thrown).

Update 21st November 2015: Apparently handling reconnect as shown here causes problems with acknowledgements when the connection breaks. The best approach is a combination of iterative reconnect for the first connection, and AutomaticRecoveryEnabled for subsequent disconnects.

3 thoughts on “Resilient Connections with RabbitMQ .NET Client”

  1. This is a good article. As a con I guess there will be a timeout as connection is already closed at the time you are executing Cleanup(). So better to dispose Connection and Channel, otherwise there will have garbage in RabbitMQ.

  2. If you close the channel and queue normally (not due to nw error) ShutdownEventArgs.Cause will be null, so you can check on that whether to reconnect.

Leave a Reply

Your email address will not be published. Required fields are marked *