Asynchronous RabbitMQ Consumers in .NET

It’s quite common to do some sort of I/O operation (e.g. REST call) whenever a message is consumed by a RabbitMQ client. This should be done asynchronously, but it’s not as simple as changing the event handling code to async void.

In “The Dangers of async void Event Handlers“, I explained how making an event handler async void will mess up the message order, because the dispatcher loop will not wait for a message to be fully processed before calling the handler on the next one.

While that article provided a workaround that is great to use with older versions of the RabbitMQ Client library, it turns out that there is an AsyncEventingBasicConsumer as from RabbitMQ.Client 5.0.0-pre3 which works great for asynchronous message consumption.

AsyncEventingBasicConsumer Example

First, we need to make sure that the RabbitMQ client library is installed.

Install-Package RabbitMQ.Client

Then, we can set up a publisher and consumer to show how to use the AsyncEventingBasicConsumer. Since this is just a demonstration, we can have both in the same process:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { DispatchConsumersAsync = true };
            const string queueName = "myqueue";

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queueName, true, false, false, null);

                // consumer

                var consumer = new AsyncEventingBasicConsumer(channel);
                consumer.Received += Consumer_Received;
                channel.BasicConsume(queueName, true, consumer);

                // publisher

                var props = channel.CreateBasicProperties();
                int i = 0;

                while (true)
                {
                    var messageBody = Encoding.UTF8.GetBytes($"Message {++i}");
                    channel.BasicPublish("", queueName, props, messageBody);
                    Thread.Sleep(50);
                }
            }
        }

There is nothing really special about the above code except that we’re using AsyncEventingBasicConsumer instead of EventingBasicConsumer, and that the ConnectionFactory is now being set up with a suspicious-looking DispatchConsumersAsync property set to true. The ConnectionFactory is using defaults, so it will connect to localhost using the guest account.

The message handler is expected to return Task, and this makes it very easy to use proper asynchronous code:

        private static async Task Consumer_Received(object sender, BasicDeliverEventArgs @event)
        {
            var message = Encoding.UTF8.GetString(@event.Body);

            Console.WriteLine($"Begin processing {message}");

            await Task.Delay(250);

            Console.WriteLine($"End processing {message}");
        }

The messages are indeed processed in order:

How to Mess This Up

Remember that DispatchConsumersAsync property? I haven’t really found any documentation explaining what it actually does, but we can venture a guess after a couple of experiments.

First, let’s keep that property, but use a synchronous EventingBasicConsumer instead (which also means changing the event handler to have a void return type). When we run this, we get an error:

It says “In the async mode you have to use an async consumer”. Which I suppose is fair enough.

So now, let’s go back to using an AsyncEventingBasicConsumer, but leave out the DispatchConsumersAsync property:

var factory = new ConnectionFactory();

This time, you’ll see that the the event handler is not firing (nothing is being written to the console). The messages are indeed being published, and the queue is remaining at zero messages, so they are being consumed (you’ll see them accumulate if you disable the consumer).

This is actually quite dangerous, yet there is no error like the one we saw earlier. It means that if a developer forgets to set that DispatchConsumersAsync property, then all messages are lost. It’s also quite strange that the choice of how to dispatch messages to the consumer (i.e. sync or async) is a property of the connection rather than the consumer, although presumably it would be a result of some internal plumbing in the RabbitMQ Client library.

Summary

AsyncEventingBasicConsumer is great for having pure asynchronous RabbitMQ consumers, but don’t forget that DispatchConsumersAsync property.

It’s only available since RabbitMQ.Client 5.0.0-pre3, so if you’re on an older version, use the workaround described in “The Dangers of async void Event Handlers” instead.

13 thoughts on “Asynchronous RabbitMQ Consumers in .NET”

  1. Thanks for the article. But is it really asynchronous? Imagine that instead of await Task.Delay(250); you have some random delay (100 – 5000) simulating some load. Then those messages are still being processed one after another.

    How to achieve that state, where while one message is waiting for some resources (Task.Delay), another message processing can start?

    Many thanks.

    1. Yes, it is asynchronous. What you’re after is actually concurrency. I have another article in this series that explains the difference.

      Processing one message after another is often the whole point of a queue, because it allows you to maintain ordering. However, you can achieve concurrent processing simply by having multiple consumers for the same queue. In this way, you have a load-balanced worker type of scenario.

      The way you’ll do this really depends on your application (e.g. is ordering important?). It is also possible that a queue is not the best tool for what you need.

      1. Well I am not exactly after concurency. I am perfectly fine with queue because I want to process messages from oldest to newest.

        Every message to me is a new task I need to process – it consists of getting data from DB, do some calculation and storing result to DB.

        And meanwhile it is waiting for DB, it can accept another message and start another task. Then (when second task starts waiting) it can switch back to first task and do calculation.

        1. Yes but if ordering is important, consuming multiple messages (e.g. via multiple consumers) can be an issue. Consider this:

          1. Consumer A takes message 1.
          2. Consumer B takes message 2.
          3. Consumer B finishes message 2 and commits.
          4. Consumer A finishes message 1 and commits.

          That’s suddenly out of order, and it’s easy for that to happen simply because a consumer takes a little long to process a message.

          1. First of all – thanks for your replies. It is appreciated.

            Second – probably I should mention it earlier – I am planning to use this Rabbit queue as a buffer (in microservice architecture). It should process messages from oldest to newest (so there is nothing unprocessed for long time), but if something finishes earlier does not matter. So your example behaviour is expected. It is not a problem.

            1. Consumer A takes message 1.
            2. Consumer B takes message 2.
            3. Consumer B finishes message 2 and commits.
            4. Consumer A finishes message 1 and commits.

            This works for me 🙂

            But I am wondering – what is the advantage of using async rabbit consumer when it works synchronously then?

            Thanks again

          2. Async rabbit consumer is useful when your consumer’s event handler needs to do async stuff (e.g. talk to an HTTP endpoint, write a file, etc). This means some kind of I/O most of the time. Async calls (think async/await) are traditionally dangerous to do in normal synchronous event handlers – see my article on the subject for more detail.

          3. I see, thanks. Basicaly Async void (fire and forget) behaviour is partially what I need :-D, but… i shouldn’t use it.

            Maybe I should modify AsyncEventingBasicConsumer.cs a bit so it can process more tasks -Task.WaitAll() (i can use prefetchCount here to limit number of tasks), then it probably could work.

            I need to think about it…

            Thanks again. Now it makes more sense.

          4. I would caution against attempting to abuse frameworks for use cases outside their scope – there is room for some very nasty surprises. Probably you should find a better tool for the job, if you just want to process messages in batches.

          5. It processes messages asynchronously. So it doesn’t wait until some message is processed and starts another processing while the previous one waits for server (disk, web, …)

            Example:
            Let’s say that with every message I want to do 3 steps.
            1. add some number;
            2. save to DB;
            3. send result to another Rabbit queue;

            Three messages are in queue for processing. Message A, B and C.

            Here is the way how I wanted it working:
            1) A is started and some number is added to it
            2) A starts saving to DB (long operation)
            3) B is started and some number is added to it
            4) B starts saving to DB (long operation)
            5) Meanwhile A saving to DB is completed, so A starts sending result to another Rabbit queue (long operation)
            6) C is started
            7) C starts saving to DB (long operation)
            8) now B saving to DB is completed, so B starts sending result to another Rabbit queue (long operation)
            9) A is finished.
            10) now C saving to DB is completed, so B starts sending result to another Rabbit queue (long operation)
            11) B – There was some problem with sending result to another Rabbit queue -> retry
            12) C is finished
            13) B is finished

            I do not care in which order those messages are processed. I do care about maximum efficiency – therefore I do not want to wait until A is completely processed and then start processing B.

            That is exactly what worked for me with EasyNetQ.

        2. Technically, the scenario you describe can probably be achieved via Task.WhenAny() (see my article on async patterns). But I don’t think this is very feasible with a regular queue (e.g. RabbitMQ) which gives you one message at a time and you can’t really start the next message until you’re done with the current one.

          Some kinds of queue do allow you to get a chunk of messages and process them in one go though (e.g. AWS SQS).

Leave a Reply

Your email address will not be published. Required fields are marked *