Tag Archives: async

Patterns for Asynchronous Composite Tasks in C#

In the previous two articles, I’ve explained why and how to use async/await for asynchronous programming in C#.

Now, we will turn our attention to more interesting things that we can do when combining multiple tasks within the same method.

Update 22nd September 2018: Another pattern not covered here is fire-and-forget. There are many ways to achieve this, including simply not awaiting (causes warnings – see ways to ignore them), using Task.Run(), using Task.Factory.StartNew(), or async void (not recommended, see “Common Mistakes in Asynchronous Programming with .NET“. This is suitable when you want to trigger some kind of processing but don’t care whether/when it completes. It doesn’t really fit the fast food scenario used in this article — placing an order without ever being notified of its completion/failure is sure to annoy customers. Which I suppose is also why applying for jobs is such a pain in the ass for many people.

Fast Food Example

In order to see each pattern at work, we need a simple example involving multiple tasks. Imagine you walk into your favourite fast food restaurant, and order a meal involving a burger, fries and a drink. Each of these takes a different amount of time to prepare, and the total time of the order may vary depending on how the execution of these three tasks takes place.

Sequential Tasks

The simplest approach is to just execute tasks one after another, waiting for one to finish before starting the next.

        static void Main(string[] args)
        {
            OrderAsync();
            Console.ReadLine();
        }

        static async void OrderAsync()
        {
            var stopwatch = Stopwatch.StartNew();

            await Task.Delay(3000)
                .ContinueWith(task => ShowCompletion("Fries", stopwatch.Elapsed));
            await Task.Delay(1000)
                .ContinueWith(task => ShowCompletion("Drink", stopwatch.Elapsed));
            await Task.Delay(5000)
                .ContinueWith(task => ShowCompletion("Burger", stopwatch.Elapsed));

            ShowCompletion("Order", stopwatch.Elapsed);

            stopwatch.Stop();
        }

        static void ShowCompletion(string name, TimeSpan time)
        {
            Console.WriteLine($"{name} completed after {time}");
        }

In this example code, we are representing the fries, drink and burger tasks as delays of different length. The rest of the code is purely diagnostic in order to allow us to get some output and understand the results. There is also a workaround allowing us to use asynchronous code in Main(), that was described in the previous article.

Here is the output from the above:

Fries completed after 00:00:03.0359621
Drink completed after 00:00:04.0408785
Burger completed after 00:00:09.0426927
Order completed after 00:00:09.0434057

Because we performed each task sequentially, the total order took 9 seconds. In a fast food restaurant, it probably does not make sense to wait for the fries to be ready before preparing the drink, and to wait for both to be ready before starting to prepare the burger. These could be done in parallel, as we will see in the next sections.

However, there are many legitimate cases where sequential task execution makes sense. We’ve seen one in “Motivation for async/await in C#“:

private async void Button_Click(object sender, RoutedEventArgs e)
{
    var baseAddress = new Uri("http://mta.com.mt");
 
    using (var httpClient = new HttpClient() { BaseAddress = baseAddress })
    {
        var response = await httpClient.GetAsync("/");
        var content = await response.Content.ReadAsStringAsync();
 
        MessageBox.Show("Response arrived!", "Slow website");
    }
}

In this case, the tasks are dependent on each other. In order to get the content of the response, the response itself must first finish executing. Because there is this dependency, the tasks must be executed one after the other.

Parallel Tasks, All Must Finish

If we fire off the tasks without awaiting them right away, there are more interesting things we can do with them. Essentially, by removing await, we are running the tasks in parallel.

        static async void OrderAsync()
        {
            var stopwatch = Stopwatch.StartNew();

            var friesTask = Task.Delay(3000)
                .ContinueWith(task => ShowCompletion("Fries", stopwatch.Elapsed));
            var drinkTask = Task.Delay(1000)
                .ContinueWith(task => ShowCompletion("Drink", stopwatch.Elapsed));
            var burgerTask = Task.Delay(5000)
                .ContinueWith(task => ShowCompletion("Burger", stopwatch.Elapsed));

            await Task.WhenAll(friesTask, drinkTask, burgerTask);

            ShowCompletion("Order", stopwatch.Elapsed);

            stopwatch.Stop();
        }

Aside from removing await before each task, we are assigning them to variables so that we can keep track of them. We then rely on Task.WhenAll() to wait until all tasks have completed (as an analogy, think of it as a memory barrier). Task.WhenAll() is awaitable, unlike its blocking cousin Task.WaitAll(). This gives us a way to easily run asynchronous tasks in parallel where it makes sense to do so.

And in a fast food restaurant, preparing fries and drink while the burger is cooking makes a lot of sense. In fact, the order is ready after just 5 seconds, which is the time of the longest task (the burger). Because the fries and drink were prepared concurrently with the burger, they did not add anything to the total time of the order.

Drink completed after 00:00:01.1696855
Fries completed after 00:00:03.0363008
Burger completed after 00:00:05.0443482
Order completed after 00:00:05.0445130

Note that Task.WhenAll() takes an IEnumerable<Task>, and as such, you can easily pass it a list of tasks (e.g. when the number of tasks is dynamic based on input or data).

Parallel Tasks, First To Finish

If you’re hungry and thirsty after an unexpected trip in the desert, it’s unlikely that you’re going to want to wait for all items to finish before starting to eat and drink. Instead, you’ll consume each item as soon as it arrives.

        static async void OrderAsync()
        {
            var stopwatch = Stopwatch.StartNew();

            var friesTask = Task.Delay(3000)
                .ContinueWith(task => ShowCompletion("Fries", stopwatch.Elapsed));
            var drinkTask = Task.Delay(1000)
                .ContinueWith(task => ShowCompletion("Drink", stopwatch.Elapsed));
            var burgerTask = Task.Delay(5000)
                .ContinueWith(task => ShowCompletion("Burger", stopwatch.Elapsed));

            await Task.WhenAny(friesTask, drinkTask, burgerTask);

            ShowCompletion("Order", stopwatch.Elapsed);

            stopwatch.Stop();
        }

Task.WhenAny() will wait until the first task has completed, and then resume execution of the method. It also returns the task that completed (though we’re not using that here).

Drink completed after 00:00:01.0390588
Order completed after 00:00:01.0412190
Fries completed after 00:00:01.0413729
Burger completed after 00:00:01.0413729

Our results are a little messed up. Since Task.WhenAny() only waits for the first task to complete, the entire order was considered complete as soon as the drink was ready. The stopwatch was subsequently stopped, and the output shows 1 second for everything even though the fries and burger actually took longer.

This scenario is useful when you want to retrieve data from different sources and just use the result that arrived fastest. It is not very intuitive for when you’re dying of hunger and want to gobble up everything as it arrives. We’ll address this in the next section.

Parallel Tasks, All Must Finish, Process As They Arrive

So here’s the scenario: we’re famished, and we want to consume our drink, fries and burger as they are ready. We want to consume all of them, but Task.WhenAny() only gives us the first task that completed.

It’s easy to reuse Task.WhenAny() to wait for all tasks to complete, by using a simple loop.

        static async void OrderAsync()
        {
            var stopwatch = Stopwatch.StartNew();

            var friesTask = Task.Delay(3000)
                .ContinueWith(task => ShowCompletion("Fries", stopwatch.Elapsed));
            var drinkTask = Task.Delay(1000)
                .ContinueWith(task => ShowCompletion("Drink", stopwatch.Elapsed));
            var burgerTask = Task.Delay(5000)
                .ContinueWith(task => ShowCompletion("Burger", stopwatch.Elapsed));

            var tasks = new List<Task>() { friesTask, drinkTask, burgerTask };
            
            while (tasks.Count > 0)
            {
                var task = await Task.WhenAny(tasks);
                tasks.Remove(task);

                Console.WriteLine($"Yum! {tasks.Count} left!");
            }

            ShowCompletion("Order", stopwatch.Elapsed);

            stopwatch.Stop();
        }

We’re putting all tasks in a list, and as each task completes, we remove it from the list. We know we’re done when there’s nothing left in the list.

Drink completed after 00:00:01.0506610
Yum! 2 left!
Fries completed after 00:00:03.0328112
Yum! 1 left!
Burger completed after 00:00:05.0317576
Yum! 0 left!
Order completed after 00:00:05.0331167

From this example, it might appear that there’s no benefit from using this approach when compared to just using continuations on tasks and using Task.WhenAll(). However, in real scenarios that don’t involve french fries, it is often reasonable to check the result of each task for failure. If one of the tasks fails, then the operation is aborted without having to wait for all the other tasks to complete.

Task With Timeout

As it turns out, we’re so hungry that we’re only willing to wait up to 4 seconds for each item, since the start of the order. If they take longer than 4 seconds, we’ll cancel that part of the order.

Fortunately, there’s an excellent blog post on the Parallel Programming MSDN blog from 2011 that shows how to write a TimeoutAfter() method that does exactly this. I’ll go ahead and steal it:

    public static class TaskExtensions
    {
        public static async Task TimeoutAfter(this Task task, int millisecondsTimeout)
        {
            if (task == await Task.WhenAny(task, Task.Delay(millisecondsTimeout)))
                await task;
            else
                throw new TimeoutException();
        }
    }

It’s an extension method, so we can easily use it with the tasks we already have:

        static async void OrderAsync()
        {
            var stopwatch = Stopwatch.StartNew();

            var friesTask = Task.Delay(3000).TimeoutAfter(4000)
                .ContinueWith(task => ShowCompletion("Fries", stopwatch.Elapsed));
            var drinkTask = Task.Delay(1000).TimeoutAfter(4000)
                .ContinueWith(task => ShowCompletion("Drink", stopwatch.Elapsed));
            var burgerTask = Task.Delay(5000).TimeoutAfter(4000)
                .ContinueWith(task => ShowCompletion("Burger", stopwatch.Elapsed));

            var tasks = new List<Task>() { friesTask, drinkTask, burgerTask };
            
            while (tasks.Count > 0)
            {
                var task = await Task.WhenAny(tasks);
                tasks.Remove(task);

                Console.WriteLine($"Yum! {tasks.Count} left!");
            }

            ShowCompletion("Order", stopwatch.Elapsed);

            stopwatch.Stop();
        }

Running this, the burger task will timeout and an exception will be thrown. Since we’re not actually checking for this, all we see in the output is that the burger task finished after 4 seconds instead of 5.

Drink completed after 00:00:01.0819761
Yum! 2 left!
Fries completed after 00:00:03.0493526
Yum! 1 left!
Burger completed after 00:00:04.0952924
Yum! 0 left!
Order completed after 00:00:04.0974441

By putting a breakpoint or turning on first chance exceptions, though, we see that the TimeoutException was indeed thrown:

Summary

  1. awaiting tasks one after another will result in sequential execution.
  2. Use Task.WhenAll() to wait for all tasks to complete before proceeding.
  3. Use Task.WhenAny() to get the first task that finished, and proceed before waiting for the others.
  4. Use Task.WhenAny() in a loop to process all tasks as they arrive, and potentially break out early in case of failure.
  5. Apply a timeout to a task using the TimeoutAfter() extension method from the Parallel Programming blog on MSDN.

Working with Asynchronous Methods in C#

In yesterday’s article, “Motivation for async/await in C#“, we have seen why asynchronous programming is important. We have also seen basic usage of the await keyword, which requires its containing method to be marked as async.

When learning to write asynchronous methods, it is not trivial to get the interactions between various methods (which may or may not be asynchronous) right. In fact, the examples in yesterday’s article which use an async void method should normally only be used in event handlers, and even so, there are caveats to consider.

In this article, we’ll go through various different scenarios in which async/await can be used.

async Task methods

Let’s take another look at the asynchronous (event handler) method from yesterday’s article:

        private async void Button_Click(object sender, RoutedEventArgs e)
        {
            var baseAddress = new Uri("http://mta.com.mt");

            using (var httpClient = new HttpClient() { BaseAddress = baseAddress })
            {
                var response = await httpClient.GetAsync("/");
                var content = await response.Content.ReadAsStringAsync();

                MessageBox.Show("Response arrived!", "Slow website");
            }
        }

Try moving out the code into a separate method, and awaiting it from the event handler. You’ll find that you can’t await an async void method:

In order to be able to await a method, it must return Task (if it doesn’t need to return anything, such as void methods) or Task<T> (if it needs to return a value of type T). We also append an –Async suffix to the method name by convention to make it obvious for people who use such methods that they’re meant to be awaited.

Thus, this example becomes:

        private async void Button_Click(object sender, RoutedEventArgs e)
        {
            await GetHtmlAsync();
        }

        private async Task GetHtmlAsync()
        {
            var baseAddress = new Uri("http://mta.com.mt");

            using (var httpClient = new HttpClient() { BaseAddress = baseAddress })
            {
                var response = await httpClient.GetAsync("/");
                var content = await response.Content.ReadAsStringAsync();

                MessageBox.Show("Response arrived!", "Slow website");
            }
        }

This is an example of an async Task method, which does not return anything. Let’s change it such that it returns the HTML from the response:

        private async void Button_Click(object sender, RoutedEventArgs e)
        {
            string html = await GetHtmlAsync();
        }

        private async Task<string> GetHtmlAsync()
        {
            var baseAddress = new Uri("http://mta.com.mt");

            using (var httpClient = new HttpClient() { BaseAddress = baseAddress })
            {
                var response = await httpClient.GetAsync("/");
                var content = await response.Content.ReadAsStringAsync();

                return content;
            }
        }

We’ve changed the signature of GetHtmlAsync() to return Task<string> intead of just Task. Correspondingly, we are now returning content (a string) from the method. At the event handler, we are now assigning the result of the await into the html variable. Apart from waiting asynchronously until the method completes, await has the additional function of unwrapping the result from the Task that contains it; thus html is of type string.

If you try removing the async keyword from GetHtmlAsync(), you’ll learn a little more about the actual function of the async keyword:

Without async, you are expected to return what the method advertises: a Task<string>. On the other hand, if you mark the method as async, the meaning of the method is changed such that you can return a string directly. The underlying Task-related plumbing is handled by the compiler.

Chaining Asynchronous Methods

In the previous section, we have seen how methods need to return a Task in order to be awaited. Typically, one async Task method will call another and await its result.

The chain of calls ends at a last async Task, typically provided by the .NET Framework or other library, which interfaces directly with I/O (e.g. network or filesystem). It must be an asynchronous method; attempting to disguise a blocking call as async here will lead to deadlocks.

async Task may (and should) be used all the way from an incoming request to the final I/O library method in application types that support top-level asynchronous methods, such as Web API or WCF.

The situation is a little different for other applications (e.g. Windows Forms, WPF) that are event-driven. Asynchronous event handlers are a special case where we need to use async void methods, as we have already seen in the WPF example from yesterday’s article:

async void methods

Event handlers are void methods that are called by the runtime in a dispatcher loop. Thus, async void methods are necessary to allow usage of await within event handlers without requiring them to return a Task

However, as we have seen before, there is no way to await an async void method. This makes async void methods very dangerous to use outside of their intended context, as I have detailed in “The Dangers of async void Event Handlers“. This is one of the more common mistakes when programming with async/await, and it is good to become familiar with the problems in order to avoid repeating the same mistakes in future.

Fake Asynchronous Methods

Sometimes, you’ll have an interface that requires asynchronous methods, yet your implementation does not need anything asynchronous in it. Let’s look at a practical example:

    public interface ISimpleStorage
    {
        Task WriteAsync(string str);
        Task<string> ReadAsync();
    }

You could implement this interface using a simple file as a backing store, in which case your methods will be suitably asynchronous:

    public class FileStorage : ISimpleStorage
    {
        public async Task<string> ReadAsync()
        {
            using (var fs = File.OpenRead("file.txt"))
            using (var sr = new StreamReader(fs))
            {
                var str = await sr.ReadToEndAsync();
                return str;
            }
        }

        public async Task WriteAsync(string str)
        {
            using (var fs = File.OpenWrite("file.txt"))
            using (var sw = new StreamWriter(fs))
            {
                await sw.WriteAsync(str);
                await sw.FlushAsync();
            }
        }
    }

However, you could have another implementation which just uses memory as storage, and in this case there’s nothing asynchronous:

    public class MemoryStorage : ISimpleStorage
    {
        private string str;

        public Task<string> ReadAsync()
        {
            return Task.FromResult(str);
        }

        public Task WriteAsync(string str)
        {
            this.str = str;
            return Task.CompletedTask;
        }
    }

In that case, your methods need not be marked async. However, this means that you will actually need to return Task instances from each method. If you have nothing to return, then just return a Task.CompletedTask (available from .NET Framework 4.6 onwards). You can also use Task.FromResult() to construct a task from a variable that you want to return.

Simplifying Single Line Asynchronous Methods

Consider the following asynchronous method:

        static async Task WaitALittleAsync()
        {
            await Task.Delay(10000);
        }

Here, we are waiting for the delay to finish, and then returning the result.

Instead, we can just return the Task itself, and let the caller do the awaiting:

        static Task WaitALittleAsync()
        {
            return Task.Delay(10000);
        }

Once again, a method does not need to be marked async if (a) it does not await anything, and (b) it can return a Task, rather than some other type that needs to be wrapped in a Task.

Using async/await in Main()

Until recently, you couldn’t use async/await in a console application’s Main() method. You can have an async Task Main() method as from C# 7.1, but you need to make sure you’re using C# 7.1 first from your project properties -> Build -> Advanced…:

You can choose either “C# 7.1”, or “C# latest minor version (latest)”. With that, you can await directly from within Main():

        static async Task Main(string[] args)
        {
            await WaitALittleAsync();
        }

Before C# 7.1, you had to use some other workaround to await from Main(). One option is to use a special AsyncContext such as the one written by Stephen Cleary. Another is to just move asynchrony out of Main() and use a Console.ReadLine() to keep the window open:

        static void Main(string[] args)
        {
            Run();
            Console.ReadLine();
        }

        static async void Run()
        {
            await WaitALittleAsync();
        }

Summary

  1. Use async Task methods wherever you can.
  2. Use async void methods for event handlers, but beware the dangers even there.
  3. Use the –Async suffix in asynchronous method names to make them easily recognisable as awaitable.
  4. Use Task.CompletedTask and Task.FromResult() to satisfy asynchronous contracts with synchronous code.
  5. Asynchronous code in Main() has not been possible until recently. Use C# 7.1+ or a workaround to get this working.

Motivation for async/await in C#

Introduction

In .NET 4.5, the async and await keywords were introduced. They are now a welcome and ubiquitous part of the .NET developer’s toolkit. However, I often run into developers who are not familiar with this concept, and I have had to explain async/await many times. Thus, I felt it would make sense to write up this explanation so that it can be accessible to all.

async/await is really just syntactic sugar for task continuations, but it makes asynchronous, I/O-bound code very elegant to work with. In fact, it is so popular that it has been spreading to other languages such as JavaScript.

As far as I can remember, async/await was introduced mainly to facilitate building Windows Phone applications. Unlike desktop applications, Windows Phone had introduced a hard limit in which no method could block for more than 50ms. Thus it was no longer possible for developers to resort to blocking their UI, and the need arose to use asynchronous paradigms, for which tasks are a very powerful, but sometimes tedious, option. async/await makes asynchronous code look almost identical to synchronous code.

In this article, I will not go into detail about using Tasks in .NET (which deserves a whole series of articles in itself), but I will give a very basic explanation on how to use them for asynchronous processing. More advanced usage is left for future articles.

Sidenote: Rejoice, for this is the 200th article published on Gigi Labs!

Motivation

Although async/await can be used in all sorts of applications, I like to use WPF applications to show why it’s important. You don’t need to know WPF for this; just follow along.

Let’s create a new WPF application, and simply drag a button from the Toolbox onto the WPF window:

Double-clicking that button will cause a click event handler to be generated in the codebehind class (MainWindow.xaml.cs):

        private void Button_Click(object sender, RoutedEventArgs e)
        {

        }

Now, let’s be evil and toss a Thread.Sleep() in there:

        private void Button_Click(object sender, RoutedEventArgs e)
        {
            Thread.Sleep(15000);
        }

Run the application without debugging (Ctrl+F5) and click the button. Try to interact with the window (e.g. drag it around). What happens?

The UI is completely frozen; you can’t click the button again, drag the window, or close it. That’s because we’ve done something very, very stupid. We have blocked the application’s UI thread.

Let us now replace the Thread.Sleep() with this Task.Delay() instead (notice there is also that async in the method signature, and it’s important):

        private async void Button_Click(object sender, RoutedEventArgs e)
        {
            await Task.Delay(15000);
        }

If you run the application now, you’ll find that you can interact with the window after clicking the button, even though the code appears to be doing practically the same thing as before. So what changed?


Image taken from: Asynchronous Programming with Async and Await (C# and Visual Basic)

The way control flow works with async/await is explained in detail on MSDN, but it may feel a little overwhelming if you’re learning this the first time. Essentially, to understand what is happening, we need to break that await Task.Delay(15000) line into the following steps:

  1. Task.Delay(15000) executes, returning a Task representing it. It does not block since it does not necessarily execute on the same thread.
  2. Because of the await, execution of the remainder of the method is temporarily suspended. In terms of method execution, it’s as if we’re blocking.
  3. Eventually, the Task finishes executing. The await part is fulfilled, and the remainder of the method can resume execution.

async/await with HttpClient

Using a simple delay is a nice way to get an initial feel of async/await, but it is also not very realistic. async/await works best when you are dealing with I/O requests such as sockets, databases, NoSQL storage, files, REST, etc. To this effect, let’s adapt our example to use an HttpClient.

First, let’s install the following package via NuGet:

Install-Package Microsoft.Net.Http

We’re going to repeat the same experiment as before: first we’ll do a silly blocking implementation, and then we’ll make it asynchronous.

Let’s work with this code that gets the response from the Google homepage:

        private void Button_Click(object sender, RoutedEventArgs e)
        {
            var baseAddress = new Uri("http://www.google.com");

            using (var httpClient = new HttpClient() { BaseAddress = baseAddress })
            {
                var response = httpClient.GetAsync("/").Result;
                var content = response.Content.ReadAsStringAsync().Result;
            }
        }

If we run this and click the button, the window blocks for a fraction of a second, and we get back the HTML from Google:

Google has a very fast response time, so it is a poor example for visualising the problems of blocking code in a UI. Instead, let’s use a website that takes a very long time to load. One that I’ve written about before is that of the Malta Tourism Authority, which takes over 20 seconds to load. Let’s change our base address in the code:

            var baseAddress = new Uri("http://mta.com.mt");

If you run the application now and click the button, you’ll see that it will be stuck for a fairly long time, just like before. We can sort this out by making the request asynchronous. To do that, we replace each .Result property with an await. In order to use await, we have to mark the method as async. In order to get a notification when the response actually comes back, I’ll also toss in a message box:

        private async void Button_Click(object sender, RoutedEventArgs e)
        {
            var baseAddress = new Uri("http://mta.com.mt");

            using (var httpClient = new HttpClient() { BaseAddress = baseAddress })
            {
                var response = await httpClient.GetAsync("/");
                var content = await response.Content.ReadAsStringAsync();

                MessageBox.Show("Response arrived!", "Slow website");
            }
        }

Run the application and click the button. You can interact with the window while the request is processed, and when the response comes back, you’ll get a notification:

If you put a breakpoint and follow along line by line, you’ll see that everything happens sequentially: the content line will not execute before the response has arrived. So async/await is really just giving us the means to make asynchronous code look very similar to normal sequential execution logic, hiding the underlying complexities. However, as we’ll see in later articles, it is also possible to use this abstraction for more powerful scenarios such as parallel task execution.

The Importance of Asynchronous Code

While I have not really explained how to work with async/await in any reasonable depth yet, the importance should now be evident: making I/O calls asynchronous means that your thread does not need to block, and can be freed to do other work. While this has a great impact on user experience in GUI applications, it is also fundamental in other applications such as APIs. Under high load, it is possible to end up with thread starvation when blocking, because all threads are stuck waiting for I/O, when they could otherwise be processing requests.

For pure asynchronous code, blocking isn’t actually ever necessary. When an I/O request such as HttpClient.GetAsync() is fired, .NET uses something called I/O Completion Ports (IOCP) which can monitor incoming I/O and trigger the caller to resume when ready.

We have merely scratched the surface here. There is a lot to be said about async/await, and likewise there are a lot of pitfalls that one must be made aware of. This article has shown why asynchronous code is important, and future articles may cover different aspects in more detail.

The Dangers of async void Event Handlers

When using async/await, you’ll want to use async Task methods most of the time, and use async void methods only for event handlers (see “Async/Await – Best Practices in Asynchronous Programming“, by Stephen Cleary, MSDN Magazine, March 2013).

This conventional wisdom works great if you’re building something like a WPF (GUI) application, and event handlers are invoked occasionally as a result of user interaction (e.g. user presses a button, and an event fires). However, there is another class of event handlers that are invoked as part of a dispatcher loop in a third-party library. async void can be pretty dangerous in these cases.

async void Event Handlers in RabbitMQ

Let’s take the example of RabbitMQ. We’ll set up a basic publisher and consumer. A fundamental property of message queues is that messages are delivered in order, and that’s what we expect to happen.

First, install the RabbitMQ Client library via NuGet:

Install-Package RabbitMQ.Client

Then, we can set up a basic publisher and consumer:

        static void Main(string[] args)
        {
            Console.Title = "async RabbitMQ";

            var factory = new ConnectionFactory();

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                const string queueName = "testqueue";

                // create queue if not already there

                channel.QueueDeclare(queueName, true, false, false, null);

                // publish

                var props = channel.CreateBasicProperties();
                    
                for (int i = 0; i < 5; i++)
                {
                    var msgBytes = Encoding.UTF8.GetBytes("Message " + i);
                    channel.BasicPublish("", queueName, props, msgBytes);
                }

                // set up consumer

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += Consumer_Received;
                channel.BasicConsume("testqueue", true, consumer);

                Console.ReadLine();
            }
        }

Our consumer will call the Consumer_Received event handler whenever a message is received. This is the first version of the event handler:

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var content = Encoding.UTF8.GetString(body);

            Console.WriteLine("Began handling " + content);

            Thread.Sleep(1000);

            Console.WriteLine("Finished handling " + content);
        }

If we run this now, the messages are processed one at a time and in order just as we expect:

rabbitmq-async-sync

Now, let’s change the event handler to an asynchronous one:

        private static async void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var body = e.Body;
            var content = Encoding.UTF8.GetString(body);

            Console.WriteLine("Began handling " + content);

            await Task.Delay(1000);

            Console.WriteLine("Finished handling " + content);
        }

If we run this now…

rabbitmq-async-async

…we see that our concurrency and ordering guarantees have just gone out the window.

Understanding async void, Revisited

In my recent article, “In-Depth Async in Akka .NET: Why We Need PipeTo()“, I explained what happens when you call async void methods. Let’s recap that.

Say we have this program. We’re calling an async void method in a loop.

        static void Main(string[] args)
        {
            Console.Title = "async void";

            for (int i = 0; i < 5; i++)
                RunJob("Job " + i);

            Console.ReadLine();
        }

        static async void RunJob(string str)
        {
            Console.WriteLine("Start " + str);

            await Task.Delay(1000);

            Console.WriteLine("End " + str);
        }

When you call an async void method, it’s done in a fire-and-forget manner. The caller has no way of knowing whether or when the operation ended, so it just resumes execution immediately, rather than waiting for the async void method to finish. So you end up with parallel and interleaved execution such as this:

rabbitmq-async-asyncvoid

If we change RunJob() to be synchronous instead…

        static void RunJob(string str)
        {
            Console.WriteLine("Start " + str);

            Thread.Sleep(1000);

            Console.WriteLine("End " + str);
        }

…you’ll see that everything happens one at a time and in order:

rabbitmq-async-sync2

So you have to be really careful when using async void:

  1. There is no way for the caller to await completion of the method.
  2. As a result of this, async void calls are fire-and-forget.
  3. Thus it follows that async void methods (including event handlers) will execute in parallel if called in a loop.
  4. Exceptions can cause the application to crash (see the aforementioned article by Stephen Cleary for more on this).

Fixing async void event handlers

Despite these problems, if you want to await in your event handler, you have to make it async void. To prevent parallel and interleaved execution, you have to lock. However, you can’t await in a lock block, so you need to use a different synchronisation mechanism such as a semaphore.

My own Dandago.Utilities provides a ScopedAsyncLock that allows you to neatly wrap the critical section in a using block:

        private static ScopedAsyncLockFactory factory = new ScopedAsyncLockFactory();

        private static async void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            using (var scopedLock = await factory.CreateLockAsync())
            {
                var body = e.Body;
                var content = Encoding.UTF8.GetString(body);

                Console.WriteLine("Began handling " + content);

                await Task.Delay(1000);

                Console.WriteLine("Finished handling " + content);
            }
        }

Like this, messages are consumed one at a time, and in order:

rabbitmq-async-scopedasynclock

ScopedAsyncLockFactory uses a semaphore underneath, so don’t forget to dispose it!

Asynchronous and Concurrent Processing in Akka .NET Actors

Yesterday, I wrote a long article about asynchronous processing in Akka .NET actors, the dangers of async void methods, and PipeTo().

The article was written as a result of outdated Akka .NET documentation which claimed that async/await was evil in Akka .NET actors and recommended using PipeTo() instead (the documentation has since been updated). This was further exacerbated by the confusion between asynchronous and concurrent processing in the documentation.

I would like to thank Aaron Stannard and Roger Alsing from the Akka .NET team who, via separate channels, clarified a lot of the confusion. This new article covers asynchronous and concurrent processing in Akka .NET actors as a result.

Asynchronous vs Concurrent Processing

A lot of people confuse asynchronous and concurrent processing, more so given that you can do both with .NET Tasks.

This is asynchronous:

        static async Task RunAsync()
        {
            await Task.Delay(1000);
        }

RunAsync begins executing, but is suspended waiting for some external operation to occur (typically I/O such as reading from a database or a REST service). It can resume execution when the operation completes, but can’t continue doing other things in the meantime.

This, on the other hand, is concurrent:

        static async Task RunAsync()
        {
            Task.Delay(1000);
        }

Because the task is not awaited, the method proceeds with its execution while the task is running. In fact, we get a nice big warning when we do this in a method marked as async:

akkanet-async-not-awaited

If we extend our example just a little bit, we can understand the behaviour better:

        static Task RunAsync()
        {
            Task.Delay(1000)
                .ContinueWith(x => Console.WriteLine("After Delay"));
            Console.WriteLine("End of RunAsync()");
        }

The output is as follows:

End of RunAsync()
After Delay

Now, I could go on about definitions of synchrony and asynchrony, concurrency and parallelism, interleaving, and task-switching. But that would bore you to tears, and it’s really not the point here.

The important thing is to realise that despite using very similar C# syntax, we’re doing two very different things here. And I need to make this clear because PipeTo() is really targeted at concurrent processing, although it is described within the context of asynchrony in Akka .NET documentation.

async/await in actors

You can do async/await in an actor by using the ReceiveActor‘s ReceiveAsync() method:

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.ReceiveAsync<string>(HandleAsync);
        }

        public async Task HandleAsync(string str)
        {
            Console.WriteLine($"Begin {str}");

            await Task.Delay(2000);

            Console.WriteLine($"End {str}");
        }
    }

This is perfectly valid, and you will most likely resort to this when you need a guarantee on message order. In such situations, you don’t want to start processing the next message while waiting for an I/O operation to complete, as it is possible for the results of an older message to overwrite those of a newer message.

It is also very useful for sequential steps in I/O operations that depend directly on each other, such as when talking to a database:

        public async Task HandleAsync(string str)
        {
            const string connStr = @"Data Source=.\SQLEXPRESS;Database=test;Integrated Security=true";
            using (var conn = new SqlConnection(connStr))
            {
                await conn.OpenAsync();

                const string sql = "select * from person;";

                using (var command = new SqlCommand(sql, conn))
                using (var reader = await command.ExecuteReaderAsync())
                {
                    while (await reader.ReadAsync())
                    {
                        string id = reader["id"].ToString();
                        string name = reader["name"].ToString();

                        Console.WriteLine($"{id} {name}");
                    }
                }
            }
        }

However, this comes at a cost. First, there is a performance impact when compared to synchronous execution, because the Akka .NET pipeline needs to carry the current context across asynchronous steps.

Secondly, the actor will not be able to process the next message until the current one is finished. Sometimes, this is exactly what you want, such as when you need a guarantee on message order. In such situations, you don’t want to start processing the next message while waiting for an I/O operation to complete, as it is possible for the results of an older message to overwrite those of a newer message. But if your message processing does not depend on prior state, you will get much more throughput if you run tasks concurrently and use PipeTo() to collect the results (more on this later).

Using Ask()

Ask() lets you do request/response between actors:

    public class ServiceActor : ReceiveActor
    {
        public ServiceActor()
        {
            this.ReceiveAsync<string>(HandleAsync);
        }

        public async Task HandleAsync(string str)
        {
            await Task.Delay(2000);
            Sender.Tell(str + " done");
        }
    }

    public class MyActor : ReceiveActor
    {
        private IActorRef serviceActor;

        public MyActor(IActorRef serviceActor)
        {
            this.serviceActor = serviceActor;

            this.ReceiveAsync<string>(HandleAsync);
        }

        public async Task HandleAsync(string str)
        {
            Console.WriteLine($"Begin {str}");

            var result = await this.serviceActor.Ask(str);

            Console.WriteLine($"End {result}");
        }
    }

Here is the output for this:

akkanet-async-ask

The approach above is something you typically want to avoid, for the same reason outlined in the previous section. If your actor is waiting for a response, it can’t process other messages in the meantime. Most of the time you should spawn a concurrent task as shown in the documentation, unless you have a good reason for not wanting to process the next message before the current one has finished. Try to design your system in a push fashion, rather than request/response.

Concurrent Execution and PipeTo()

If you have no reason to process messages in a strictly sequential manner, then you can do long-running tasks and I/O operations in a spawned task.

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.Receive<string>(x => Handle(x));
        }

        public void Handle(string str)
        {
            Task.Run(async () =>
            {
                Console.WriteLine($"Begin {str}");

                await Task.Delay(2000);

                Console.WriteLine($"End {str}");
            });
        }
    }

Because of this, the actual processing you do within the task will be interleaved, as in yesterday’s article:

akkanet-async-concurrent

But this is okay, because we’re working on the assumption that the messages don’t need to be processed strictly in sequence.

Now if you want to send the result of your concurrent tasks somewhere, you can do that with PipeTo():

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.Receive<string>(x => Handle(x));
            this.Receive<int>(x => Handle(x));
        }

        public void Handle(string str)
        {
            Task.Run(async () =>
            {
                Console.WriteLine($"Begin {str}");

                await Task.Delay(2000);

                Console.WriteLine($"End {str}");

                return 42;
            }).PipeTo(Self);
        }

        public void Handle(int result)
        {
            Console.WriteLine($"Got result: {result}");
        }
    }

The result of the concurrent operation is sent to the actor you specify (in this case to itself) and processed as any other message in its mailbox. You can also do post-processing (e.g. check HTTP status code after an HTTP GET operation) by adding a ContinueWith(); see the PipeTo() article on the Petabridge blog for an example.

More Advanced Concurrent Operations

Given that you can use both tasks and async/await in your actors, you can use all of the typical patterns you would normally use with the Task Parallel Library (TPL).

Here’s an example representing when you’d aggregate data from multiple external resources:

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.ReceiveAsync<string>(x => HandleAsync(x));
        }

        public async Task HandleAsync(string str)
        {
            var task1 = Task.Delay(1000).ContinueWith(x => { return 1; });
            var task2 = Task.Delay(2000).ContinueWith(x => { return 2; });
            var task3 = Task.Delay(3000).ContinueWith(x => { return 3; });

            var results = await Task.WhenAll<int>(task1, task2, task3);
            var sum = results.Sum();
            Console.WriteLine(sum);
        }
    }

WhenAll() will wait for all the tasks to complete before the method can proceed with its execution. Here’s the output:

akkanet-async-whenall

Here’s another example which takes the result of whichever task completes first:

        public async Task HandleAsync(string str)
        {
            var task1 = Task.Delay(1000).ContinueWith(x => { return 1; });
            var task2 = Task.Delay(2000).ContinueWith(x => { return 2; });
            var task3 = Task.Delay(3000).ContinueWith(x => { return 3; });

            var result = await await Task.WhenAny<int>(task1, task2, task3);
            Console.WriteLine(result);
        }

In this example, WhenAny() suspends execution of the method until any of the tasks completes. The result from the fastest task is taken.

akkanet-async-whenany

Note: if you’re looking to do this kind of concurrent fastest-query operation, you might want to look at Akka .NET Routers with Routing Strategies such as ScatterGatherFirstCompleted.