Asynchronous and Concurrent Processing in Akka .NET Actors

Yesterday, I wrote a long article about asynchronous processing in Akka .NET actors, the dangers of async void methods, and PipeTo().

The article was written as a result of outdated Akka .NET documentation which claimed that async/await was evil in Akka .NET actors and recommended using PipeTo() instead (the documentation has since been updated). This was further exacerbated by the confusion between asynchronous and concurrent processing in the documentation.

I would like to thank Aaron Stannard and Roger Alsing from the Akka .NET team who, via separate channels, clarified a lot of the confusion. This new article covers asynchronous and concurrent processing in Akka .NET actors as a result.

Asynchronous vs Concurrent Processing

A lot of people confuse asynchronous and concurrent processing, more so given that you can do both with .NET Tasks.

This is asynchronous:

        static async Task RunAsync()
        {
            await Task.Delay(1000);
        }

RunAsync begins executing, but is suspended waiting for some external operation to occur (typically I/O such as reading from a database or a REST service). It can resume execution when the operation completes, but can’t continue doing other things in the meantime.

This, on the other hand, is concurrent:

        static async Task RunAsync()
        {
            Task.Delay(1000);
        }

Because the task is not awaited, the method proceeds with its execution while the task is running. In fact, we get a nice big warning when we do this in a method marked as async:

akkanet-async-not-awaited

If we extend our example just a little bit, we can understand the behaviour better:

        static Task RunAsync()
        {
            Task.Delay(1000)
                .ContinueWith(x => Console.WriteLine("After Delay"));
            Console.WriteLine("End of RunAsync()");
        }

The output is as follows:

End of RunAsync()
After Delay

Now, I could go on about definitions of synchrony and asynchrony, concurrency and parallelism, interleaving, and task-switching. But that would bore you to tears, and it’s really not the point here.

The important thing is to realise that despite using very similar C# syntax, we’re doing two very different things here. And I need to make this clear because PipeTo() is really targeted at concurrent processing, although it is described within the context of asynchrony in Akka .NET documentation.

async/await in actors

You can do async/await in an actor by using the ReceiveActor‘s ReceiveAsync() method:

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.ReceiveAsync<string>(HandleAsync);
        }

        public async Task HandleAsync(string str)
        {
            Console.WriteLine($"Begin {str}");

            await Task.Delay(2000);

            Console.WriteLine($"End {str}");
        }
    }

This is perfectly valid, and you will most likely resort to this when you need a guarantee on message order. In such situations, you don’t want to start processing the next message while waiting for an I/O operation to complete, as it is possible for the results of an older message to overwrite those of a newer message.

It is also very useful for sequential steps in I/O operations that depend directly on each other, such as when talking to a database:

        public async Task HandleAsync(string str)
        {
            const string connStr = @"Data Source=.\SQLEXPRESS;Database=test;Integrated Security=true";
            using (var conn = new SqlConnection(connStr))
            {
                await conn.OpenAsync();

                const string sql = "select * from person;";

                using (var command = new SqlCommand(sql, conn))
                using (var reader = await command.ExecuteReaderAsync())
                {
                    while (await reader.ReadAsync())
                    {
                        string id = reader["id"].ToString();
                        string name = reader["name"].ToString();

                        Console.WriteLine($"{id} {name}");
                    }
                }
            }
        }

However, this comes at a cost. First, there is a performance impact when compared to synchronous execution, because the Akka .NET pipeline needs to carry the current context across asynchronous steps.

Secondly, the actor will not be able to process the next message until the current one is finished. Sometimes, this is exactly what you want, such as when you need a guarantee on message order. In such situations, you don’t want to start processing the next message while waiting for an I/O operation to complete, as it is possible for the results of an older message to overwrite those of a newer message. But if your message processing does not depend on prior state, you will get much more throughput if you run tasks concurrently and use PipeTo() to collect the results (more on this later).

Using Ask()

Ask() lets you do request/response between actors:

    public class ServiceActor : ReceiveActor
    {
        public ServiceActor()
        {
            this.ReceiveAsync<string>(HandleAsync);
        }

        public async Task HandleAsync(string str)
        {
            await Task.Delay(2000);
            Sender.Tell(str + " done");
        }
    }

    public class MyActor : ReceiveActor
    {
        private IActorRef serviceActor;

        public MyActor(IActorRef serviceActor)
        {
            this.serviceActor = serviceActor;

            this.ReceiveAsync<string>(HandleAsync);
        }

        public async Task HandleAsync(string str)
        {
            Console.WriteLine($"Begin {str}");

            var result = await this.serviceActor.Ask(str);

            Console.WriteLine($"End {result}");
        }
    }

Here is the output for this:

akkanet-async-ask

The approach above is something you typically want to avoid, for the same reason outlined in the previous section. If your actor is waiting for a response, it can’t process other messages in the meantime. Most of the time you should spawn a concurrent task as shown in the documentation, unless you have a good reason for not wanting to process the next message before the current one has finished. Try to design your system in a push fashion, rather than request/response.

Concurrent Execution and PipeTo()

If you have no reason to process messages in a strictly sequential manner, then you can do long-running tasks and I/O operations in a spawned task.

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.Receive<string>(x => Handle(x));
        }

        public void Handle(string str)
        {
            Task.Run(async () =>
            {
                Console.WriteLine($"Begin {str}");

                await Task.Delay(2000);

                Console.WriteLine($"End {str}");
            });
        }
    }

Because of this, the actual processing you do within the task will be interleaved, as in yesterday’s article:

akkanet-async-concurrent

But this is okay, because we’re working on the assumption that the messages don’t need to be processed strictly in sequence.

Now if you want to send the result of your concurrent tasks somewhere, you can do that with PipeTo():

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.Receive<string>(x => Handle(x));
            this.Receive<int>(x => Handle(x));
        }

        public void Handle(string str)
        {
            Task.Run(async () =>
            {
                Console.WriteLine($"Begin {str}");

                await Task.Delay(2000);

                Console.WriteLine($"End {str}");

                return 42;
            }).PipeTo(Self);
        }

        public void Handle(int result)
        {
            Console.WriteLine($"Got result: {result}");
        }
    }

The result of the concurrent operation is sent to the actor you specify (in this case to itself) and processed as any other message in its mailbox. You can also do post-processing (e.g. check HTTP status code after an HTTP GET operation) by adding a ContinueWith(); see the PipeTo() article on the Petabridge blog for an example.

More Advanced Concurrent Operations

Given that you can use both tasks and async/await in your actors, you can use all of the typical patterns you would normally use with the Task Parallel Library (TPL).

Here’s an example representing when you’d aggregate data from multiple external resources:

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.ReceiveAsync<string>(x => HandleAsync(x));
        }

        public async Task HandleAsync(string str)
        {
            var task1 = Task.Delay(1000).ContinueWith(x => { return 1; });
            var task2 = Task.Delay(2000).ContinueWith(x => { return 2; });
            var task3 = Task.Delay(3000).ContinueWith(x => { return 3; });

            var results = await Task.WhenAll<int>(task1, task2, task3);
            var sum = results.Sum();
            Console.WriteLine(sum);
        }
    }

WhenAll() will wait for all the tasks to complete before the method can proceed with its execution. Here’s the output:

akkanet-async-whenall

Here’s another example which takes the result of whichever task completes first:

        public async Task HandleAsync(string str)
        {
            var task1 = Task.Delay(1000).ContinueWith(x => { return 1; });
            var task2 = Task.Delay(2000).ContinueWith(x => { return 2; });
            var task3 = Task.Delay(3000).ContinueWith(x => { return 3; });

            var result = await await Task.WhenAny<int>(task1, task2, task3);
            Console.WriteLine(result);
        }

In this example, WhenAny() suspends execution of the method until any of the tasks completes. The result from the fastest task is taken.

akkanet-async-whenany

Note: if you’re looking to do this kind of concurrent fastest-query operation, you might want to look at Akka .NET Routers with Routing Strategies such as ScatterGatherFirstCompleted.