All posts by Gigi

Interview: Virtual Actors with Microsoft Orleans

Introduction

In recent years, the ever increasing demand for computing resources has rendered traditional single-threaded programming inadequate for most modern applications. Faced by heavy performance and scalability challenges, many developers are forced to turn to concurrent and distributed programming.

While multithreaded programming has been in use for many years, those who have used it will know that building a performant shared memory system free of race conditions can be very challenging to get right.

It is possible to avoid the complications of shared memory systems, and indeed multithreading, by using a message passing system. An actor model is a framework where processing is done by a large number of single-threaded actors, which communicate together by sending asynchronous messages.

As it turns out, Microsoft have their own actor model, and it’s called Orleans. Sergey Bykov (SB), Principal Software Development Engineer Lead at Microsoft, and project lead of the Orleans project, has very kindly agreed to answer my (DD) questions about Orleans.

Orleans Overview

DD: What is Microsoft Orleans?

SB: The home page of our docs says the following.

“Orleans is a framework that provides a straightforward approach to building distributed high-scale computing applications, without the need to learn and apply complex concurrency or other scaling patterns. It was created by Microsoft Research and designed for use in the cloud.

“Orleans has been used extensively in Microsoft Azure by several Microsoft product groups, most notably by 343 Industries as a platform for all of Halo 4 and Halo 5 cloud services, as well as by a growing number of other companies.”

SB: In other words, Orleans provides a programming model (backed by the Orleans runtime) for building distributed scalable applications almost as easily as single machine apps. The goal of the project from the beginning was to democratize cloud development by making a broad range of developers with little to no distributed systems expertise productive and successful in building scalable distributed systems in general, and cloud services in particular.

The introduction explains that Orleans is built around a distributed actor model, and the key innovation there is the notion of Virtual Actors. Detailed description is in our publication.

DD: Out of curiosity, why the name, ‘Orleans’?

SB: It was a general rule within Microsoft that codenames should be chosen from geographical names like names of cities because those aren’t trademark-able. Over time, the codename of Orleans accrued enough brand recognition that we decided to stick with it when we went open source.

DD: Tell us a little about the history of Microsoft Orleans.

SB: Orleans started in 2009 as a research project within a new Microsoft Research lab that eventually was named eXtreme Computing Group (XCG, it was later merged with MSR’s Redmond lab). The goal for the project was to try to create something that would qualitatively simplify creating software for the cloud. The two major challenges we focused on were 1) the complexity of building distributed systems that has traditionally been the domain of a relatively small population of expert developers; and 2) the pattern of major re-architectures required from nearly every successful web property as they experienced exponential growth of their user base.

We took on building a framework with a programming mode that would make mainstream single-machine developers productive in the cloud and would help build systems and services that could easily scale through several orders of magnitude of growing scale. While focusing on mainstream developers, we wanted Orleans to be as appealing to expert developers, by reducing the amount of low level ceremony they have to deal with. As we went through several early prototypes and iterations, we learned quite a bit from building first Orleans applications, and even more so when we started collaborating with internal product groups. The programming model has evolved, and we arrived to what we ended up naming the “Virtual Actor Model”.

Using Orleans

DD: How does Microsoft Orleans compare with other actor models?

SB: The Actor Model is quite old, and there are many various implementations of it. There’s a much smaller number of available Distributed Actor Model solutions. The most popular ones are Erlang/OTP and its JVM “younger sibling” Akka. Erlang and Akka organically grew from being single process actor libraries into the multi-machine scenarios by gradually adding remoting and distribution features. They brought the fault tolerance model of hierarchical supervision trees that are easy within a single process, acceptable for small-scale fixed topologies, but are difficult to manage at cloud scale, especially for developers with limited distributed systems experience.

The Virtual Actor Model of Orleans removed a lot of coordination and fault tolerance complexity from developers’ shoulders by providing an intuitive notion of actors that don’t need to be created, destroyed or looked up. The “Virtual” qualifier comes from the analogy with virtual memory. Actors in Orleans live “eternal” life, always available for a call to process, and the Orleans runtime is responsible for instantiating their physical “incarnations” in memory on an as needed basis, and for removing idle ones to free up resources. The Orleans runtime also transparently handles failures of servers by keeping track of instantiated actors and recreating them when needed on a different server in case of a failure. As a result, the developer writes much less code (we’ve received anecdotal reports of 3-5 times reduction of code, up to 10 times in some cases) and much simpler code, free from data races and complex distributed coordination logic.

The effort of Orleans to ‘democratize’ distributed programming and to raise developer productivity received an endorsement of sort from the inventor of Actor Model, Carl Hewitt. In his recent publication Actor Model of Computation for Scalable Robust Information Systems he wrote that: “Orleans is an important step in furthering a goal of the Actor Model that application programmers need not be so concerned with low-level system details.” Obviously, that made the Orleans team very proud.

DD: In Microsoft Orleans, virtual actors are also known as grains. They run within host processes called silos. Why were these names devised?

SB: Early on we had the intuition that we’d end up with a novel programming model. In hindsight, that was prescient. The “grains” term is distinct from the already overloaded term actor, where it’s hard to tell upfront if somebody is talking about single machine concurrency or about a distributed case. In the end, “grain” is a shorthand for “Orleans actor” or “virtual actor”. When we needed to name the runtime containers for grains, we naturally went down the agricultural path with “silos”. Just imagine the confusion if called them “containers”.

DD: Who is using Microsoft Orleans, and how well does it support their systems’ scalability?

SB: Orleans has been used in production inside Microsoft since 2011. It is enjoing a growing adoption outside Microsoft after we publicly released a binary preview, and then open-sourced it. We see a wide range of systems built with Orleans: online gaming, finance, collaboration solutions, fraud detection, social network analysis, and more. One of the hottest areas is IoT. There we see Orleans-based systems that manage devices like thermostats and even, I’m not joking, mousetraps. One of the fascinating projects is the green power storage facility in Hawaii. We showed some scalability numbers in our paper.

DD: Is Microsoft Orleans meant only to be used in the cloud?

SB: The advent of the cloud brought the challenges of building reliable scalable distributed systems into the spotlight. Orleans as a project focused on solving those fundamental challenges. As a result, Orleans is equally applicable in any cloud and on premises. We have customers running Orleans in AWS and some interested in GCP, but also those that use it in private datacenters and on corporate IT infrastructures. Our first target was naturally Azure, and we built providers and extensions for it first. But Orleans was designed with extensibility in mind, and it is fairly easy to make it run pretty much anywhere.

Development and Support

DD: What is the Microsoft Orleans team currently working on, and is there a roadmap for future development?

SB: Our current focus is on making Orleans run on .NET Core, support for geo-distribution, improvements to streaming, application lifecycle and the upgrade and versioning process. Even though the project moved out of Microsoft Research to the product group, we have an ongoing collaboration with Research, which gives us a healthy pipeline of new ideas and advanced prototypes. Support for geo-disribution is one example. We also have support for indexing of actors, ACID multi-actor transactions, and reactive computations at various stages of readiness. Orleans is one of the most popular Microsoft open source projects, right next to .NET Core and Roslyn. We continue to work on it and recently substantially increased our investments.

DD: What resources are available for developers building their systems upon Microsoft Orleans?

SB: We keep hearing that our documentation is very good compared to other open source projects, but we keep improving it (and samples) as people point to topics that aren’t clear or can be explained better. The community around the project is our biggest “brain trust” and the best source of support for new people. It’s an amazing group of experienced and passionate engineers around the globe that come to our GitHub repo and Gitter chat not only because they use Orleans for their projects and contribute to it, but also because they enjoy hanging out with this very welcoming and encouraging community that always tries to help, even with topics not directly related to Orleans.

Custom Loggers in Akka .NET

Akka .NET supports a flexible logging mechanism that can adapt with various logging providers, as we have seen in my earlier article on logging with Akka .NET. Aside from the default logger that writes to the console, you can plug in various loggers of your own choosing (e.g. NLog), set them up in configuration, and work with them using a common interface.

Sometimes, you may have specific logging requirements that are not covered by any of the existing logging plugins for Akka .NET. In such cases, you would need to write your own custom logger. Unfortunately, the Akka .NET Logging documentation does not explain how to do this at the time of writing this article.

This article is intended to fill this gap, explaining how to write a custom logger for Akka .NET, but also touching upon various topics such as reading custom configuration, actor lifecycle hooks, and cleanup of resources used by the ActorSystem. The source code for this article is available at the Gigi Labs BitBucket repository.

In a Nutshell

Akka .NET Logging is basically a port of Akka logging. Any logger is an actor that receives the following messages: Debug, Info, Warning, Error and InitializeLogger. These are all standard Akka .NET messages, and we will see how to use them in a minute. At the time of writing this article, I’ve come across the following loggers that one can refer to in order to see how custom loggers are built:

Main Program

For the sake of this article, we can go along with the following simple program:

            using (var actorSystem = ActorSystem.Create("MyActorSystem"))
            {
                var logger = Logging.GetLogger(actorSystem, actorSystem, null);
                logger.Info("ActorSystem created!");

                Console.WriteLine("Press ENTER to exit...");
                Console.ReadLine();
            }

In my earlier Akka .NET logging article, we had done logging only from within actors. The above code shows how you can use the same configured logger directly from the ActorSystem.

We are now going to need a little configuration.

<?xml version="1.0" encoding="utf-8" ?>
<configuration>

  <configSections>

<section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka" />
  </configSections>

  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6.1" />
  </startup>

  <akka>
    <hocon>
      <![CDATA[ akka { loglevel = DEBUG loggers = ["AkkaNetCustomLogger.Loggers.ConsoleLogger, AkkaNetCustomLogger"] actor { debug { receive = on # log any received message autoreceive = on # log automatically received messages, e.g. PoisonPill lifecycle = on # log actor lifecycle changes event-stream = on # log subscription changes for Akka.NET event stream unhandled = on # log unhandled messages sent to actors } } } ]]>
    </hocon>
  </akka>

</configuration>

Here, we’re turning on all internal Akka .NET logging so that we can automatically get some logging output.

The important thing here is the loggers configuration, where we’re specifying the custom logger that we want Akka .NET to use. This is exactly how we had set up Akka .NET to use NLog in my earlier article, but this time we’re going to use a class called ConsoleLogger (which we have yet to create).

Writing the ConsoleLogger

As I mentioned earlier, any custom logger needs to handle five messages: Debug, Info, Warning, Error, and InitializeLogger. That’s the first thing we’ll set up our ConsoleLogger to do.

    public class ConsoleLogger : ReceiveActor
    {
        public ConsoleLogger()
        {
            Receive<Debug>(e => this.Log(LogLevel.DebugLevel, e.ToString()));
            Receive<Info>(e => this.Log(LogLevel.InfoLevel, e.ToString()));
            Receive<Warning>(e => this.Log(LogLevel.WarningLevel, e.ToString()));
            Receive<Error>(e => this.Log(LogLevel.ErrorLevel, e.ToString()));
            Receive<InitializeLogger>(_ => this.Init(Sender));
        }

        // ...
    }

The actual logging messages will use a common Log() helper method, since they differ only in log level (the operation of writing to the log destination is the same for all). Note that when we convert each of these classes to string, that includes the log level, so we don’t need to write it separately. In the case of the ConsoleLogger, we are passing in the LogLevel merely so we can use a different colour for each level.

The special message we haven’t covered yet is InitializeLogger. When the ActorSystem creates the logger actor, the internal event bus needs to know whether the logger is ready to start accepting messages. It does this by sending the logger actor an InitializeLogger message, and expects a LoggerInitialized message in return:

        private void Init(IActorRef sender)
        {
            using (var consoleColour = new ScopedConsoleColour(ConsoleColor.Green))
                Console.WriteLine("Init");

            sender.Tell(new LoggerInitialized());
        }

Aside from sending back the required message, I’m also logging the init operation itself, so that we can later observe the sequence of events. I am using my trusty ScopedConsoleColour class to change the colour, and then reset it back after the message has been written.

If you don’t send the LoggerInitialized message back, the actor system reports a timeout from initialising the logger, and basically you get no logging. (Well, ironically, the timeout itself is logged… presumably using the DefaultLogger as a fallback.)

akkanet-logging-notinitialized

Now we can implement our Log() helper method:

        private void Log(LogLevel level, string message)
        {
            ConsoleColor colour = ConsoleColor.Gray;

            switch (level)
            {
                case LogLevel.DebugLevel:
                    colour = ConsoleColor.Gray;
                    break;
                case LogLevel.InfoLevel:
                    colour = ConsoleColor.White;
                    break;
                case LogLevel.WarningLevel:
                    colour = ConsoleColor.Yellow;
                    break;
                case LogLevel.ErrorLevel:
                    colour = ConsoleColor.Red;
                    break;
                default: // shouldn't happen
                    goto case LogLevel.InfoLevel;
            }

            using (var consoleColour = new ScopedConsoleColour(colour))
                Console.WriteLine(message);
        }

Here we’re doing nothing but using a different colour per level, and writing the message to the console (which is pretty much what StandardOutLogger does). Remember, the level is already part of the message, so we don’t need to format it into the output message. (And if you’re outraged at the use of goto above, I suggest you read about goto case in C#.)

Actor Life Cycle Hooks

If your custom logger depends on external resources (which is most likely the case) such as the filesystem or a database, you will want to initialise those resources when the logger actor is created, and clean them up when it is destroyed. That work typically goes into actor life cycle hooks, i.e. overridable methods that allow you to run arbitrary code when an actor starts, stops, or restarts.

We don’t need to do this for ConsoleLogger, so we will simply log the start and stop operation instead. However, we will use these hooks more realistically when we implement the FileLogger.

        protected override void PreStart()
        {
            base.PreStart();

            using (var consoleColour = new ScopedConsoleColour(ConsoleColor.Green))
                Console.WriteLine("PreStart");
        }

        protected override void PostStop()
        {
            using (var consoleColour = new ScopedConsoleColour(ConsoleColor.Green))
                Console.WriteLine("PostStop");

            base.PostStop();
        }

We can now run this and see the logging in action:

akkanet-logging-console

Now, for something interesting, put a breakpoint inside PostStop(), and press ENTER to cause the program to continue and terminate. One would expect PostStop() to run as the ActorSystem is shutting down. But in fact, it doesn’t.

Next, go back to the main program, and add a second Console.ReadLine() at the end:

            using (var actorSystem = ActorSystem.Create("MyActorSystem"))
            {
                var logger = Logging.GetLogger(actorSystem, actorSystem, null);
                logger.Info("ActorSystem created!");

                Console.WriteLine("Press ENTER to exit...");
                Console.ReadLine();
            }

            Console.ReadLine();

Run it again, and when you press ENTER, the breakpoint is hit and the PostStop event is written to the console while waiting for the second ENTER:

akkanet-logging-poststop

When we disposed the ActorSystem earlier, the program terminated before the ActorSystem had the chance to do its cleanup work. It appears that when the ActorSystem shuts down, it doesn’t clean resources right away; most likely it is done using asynchronous messaging just like in the rest of Akka .NET. For this reason, in your program’s stopping code, you might want to wait a little bit between destroying the ActorSystem and actually letting the application terminate, in order to let it gracefully free its resources.

Writing the FileLogger

We will now write a second custom logger, this time one that writes to file.

First, change the HOCON configuration to use the new logger.

          loggers = ["AkkaNetCustomLogger.Loggers.FileLogger, AkkaNetCustomLogger"]

Next, let’s write the FileLogger. As with the ConsoleLogger, we need to handle the same five messages:

    public class FileLogger : ReceiveActor
    {
        private StreamWriter writer;

        public FileLogger()
        {
            ReceiveAsync<Debug>(async e => await this.LogAsync(e.ToString()));
            ReceiveAsync<Info>(async e => await this.LogAsync(e.ToString()));
            ReceiveAsync<Warning>(async e => await this.LogAsync(e.ToString()));
            ReceiveAsync<Error>(async e => await this.LogAsync(e.ToString()));
            Receive<InitializeLogger>(_ => Sender.Tell(new LoggerInitialized()));
        }

        // ...
    }

The logger keeps a reference to a StreamWriter, which wraps the file we will be writing to.

The LogAsync() method simply dumps the messages into that StreamWriter (remember, streams and toilets must always be flushed):

        private async Task LogAsync(string message)
        {
            await this.writer.WriteLineAsync(message);
            await this.writer.FlushAsync();
        }

We can open the file itself either during the InitializeLogger handler or in PreStart(). Let’s use a fixed filename for now:

        protected override void PreStart()
        {
            base.PreStart();

            string filePath = "log.txt";
            var fileStream = File.OpenWrite(filePath);
            this.writer = new StreamWriter(fileStream);
        }

We can then do the cleanup in PostStop():

        protected override void PostStop()
        {
            // dispose the StreamWriter, and implicitly the
            // underlying FileStream with it
            this.writer.Dispose();

            base.PostStop();
        }

We only need to Dispose() our StreamWriter; doing that will automatically also close the underlying FileStream.

Now, while this is enough to log to file, there is a problem. We can’t actually use another program to read the log file while the program is running:

akkanet-logging-cantread

We can fix this by changing the way we open the file.

        protected override void PreStart()
        {
            base.PreStart();

            string filePath = "log.txt";
            var fileStream = File.Open(filePath, FileMode.Append, FileAccess.Write, FileShare.Read);
            this.writer = new StreamWriter(fileStream);
        }

You’ll see that the log messages are now written to file:

akkanet-logging-fileoutput

However, there is another problem. If you press ENTER to close the program, the following happens:

akkanet-logging-race

Upon further inspection, it seems that logging messages are coming in around the same time that PostStop() is running, causing a race condition on the underlying resource. I’ve opened a bug report for this, but until this is sorted, you can flush synchronously as a workaround:

        private async Task LogAsync(string message)
        {
            await this.writer.WriteLineAsync(message);
            this.writer.Flush();
        }

So, if there is this problem, how do existing logging adapters that have a file-based component (e.g. NLog) do their cleanup? Well, I’ve checked a few, and it seems they don’t.

Loading Custom Configuration

We’ve managed to write a file logger, but we’re using a fixed filename. How can we make it configurable?

It turns out we can just add an arbitrary setting anywhere in the HOCON configuration, and read it from inside the actor. So, let’s add this:

        akka
        {
          loglevel = DEBUG
          loggers = ["AkkaNetCustomLogger.Loggers.FileLogger, AkkaNetCustomLogger"]
          logfilepath = "logfile.txt"

We can get to the setting we want using the configuration system in Akka .NET:

        protected override void PreStart()
        {
            base.PreStart();

            string filePath = "log.txt";

            filePath = Context.System.Settings.Config
                .GetString("akka.logfilepath", filePath);

            var fileStream = File.Open(filePath, FileMode.Append, FileAccess.Write, FileShare.Read);
            this.writer = new StreamWriter(fileStream);
        }

Basically we’re reading the “akka.logfilepath” key from the HOCON config. We’re also passing in filePath as a default in case the setting is not found.

Running Multiple Loggers

So far we’ve been using either one logger or the other. But if you notice, the loggers configuration in HOCON is actually an array. Thus there is nothing stopping us from using multiple loggers at once:

        akka
        {
          loglevel = DEBUG
          loggers = ["AkkaNetCustomLogger.Loggers.FileLogger, AkkaNetCustomLogger",
                     "AkkaNetCustomLogger.Loggers.ConsoleLogger, AkkaNetCustomLogger"]
          logfilepath = "logfile.txt"

Yes, it works:

akkanet-logging-multiple

Akka .NET IActorRef: Local or Remote?

Akka .NET supports location transparency. When you use an IActorRef, your application shouldn’t care whether that actor is running on the same machine or somewhere else on the network. You can change where an actor runs as a matter of configuration, and your application will never know the difference.

Although an application shouldn’t depend on the physical location of an actor to perform its logic, knowing where an actor is running can be useful (e.g. when troubleshooting issues).

akkanet-islocal

There is an IsLocal property that you can use to tell whether an actor is running locally or remotely. However, this is not immediately accessible from the IActorRef. Instead, you need to cast your IActorRef to an InternalActorRefBase to be able to use it:

(localChatActor as InternalActorRefBase).IsLocal

If you’re working with an ActorSelection (which you probably are if you’re using remote actors), then you will first want to get to the particular IActorRef of the actor. You can do this via the ActorSelection‘s Anchor property.

(remoteChatActor.Anchor as InternalActorRefBase).IsLocal

This will allow you to check whether an actor is running locally or remotely. But remember: use this only for diagnostic purposes, and don’t make your application code dependent on it.

Replaying Chess Games using Akka.Persistence Event Sourcing

Introduction: Event Sourcing

In most modern games, it is conventional wisdom that you should save your progress regularly, lest you take a wrong turn and get mauled:

persistence-starcraft-save

Software is no different. If you’re running an actor system in memory, you risk losing state if anything happens to the actors or to the whole actor system.

It is thus important to save the state of your actors, but how?

In message-based systems such as Akka .NET, a popular approach towards recovery is to save messages as they arrive, and in case of failure, simply handle them again in the same order to restore the last state. This is known as event sourcing.

Akka .NET provides event sourcing support thanks to the Akka.Persistence module, as we shall see shortly.

The source code for this article is available at the Gigi Labs BitBucket repository.

Chess Scenario

Chess is a great example with which to demonstrate event sourcing because a chess game consists of a sequential set of moves which can be represented using a specific notation. We can save these moves and later replay the game.

It is also very easy to draw a chess board in a console application. It’s just an 8×8 grid with the pieces on it. We can represent the various chess pieces using different letters, and use uppercase and lowercase (instead of white and black) to distinguish between the two players’ pieces, as GNU Chess does:

gnuchess

Chess can get quite complex and I really don’t want to get lost in the details (since this article is about Akka.Persistence), so we’ll make a number of assumptions as follows to keep things simple:

  • No validation. Pieces can be moved anywhere on the board.
  • Both players use the same console window and take it in turns.
  • No game state (i.e. you can never win or lose).
  • Input will be of the format: move <from> to <to>, for example move e2 to e4.

In other words, we’re not really building a chess game. We’re just emulating the board and the movement so that we can store moves and replay them later.

Prerequisites

To follow along, install the following NuGet packages:

Install-Package Akka
Install-Package Akka.Persistence -pre
Install-Package Akka.Persistence.SqlServer -pre

Akka.Persistence seems to be prerelease at the time of writing this article, so you will need the -pre flag.

Akka.Persistence is very flexible in terms of where messages are saved. In this example we’re going to use SQL Server, but there are a whole load of storage implementations. Just look up “Akka.Persistence” in NuGet and you’ll see the available options:

akka.persistence-nuget

System Overview

akka-persistence-chess

We have a ChessGameActor that holds the game state (i.e. the chess board). I was originally going to use a string[] for this, but since we need to update individual characters, the immutable nature of strings becomes a problem. We need to use a 2-dimensional char array instead.

    public class ChessGameActor : ReceiveActor
    {
        private Guid gameId;
        private IActorRef rendererActor;

        private char[][] chessBoard = new char[][]
        {
            "rnbqkbnr".ToCharArray(),
            "pppppppp".ToCharArray(),
            "        ".ToCharArray(),
            "        ".ToCharArray(),
            "        ".ToCharArray(),
            "        ".ToCharArray(),
            "PPPPPPPP".ToCharArray(),
            "RNBQKBNR".ToCharArray()
        };

        // ...
    }

We also have a ChessBoardDrawingActor responsible for actually drawing the chess board. The ChessGameActor has a reference to it so that it can ask it to redraw the board when someone moves a piece.

The details of how ChessBoardDrawingActor is implemented are omitted for brevity (refer to the source code if you need it), but it basically just handles DrawChessBoardMessages coming from the ChessGameActor:

    public class ChessBoardDrawingActor : ReceiveActor
    {
        public ChessBoardDrawingActor()
        {
            this.Receive<DrawChessBoardMessage>(m => Handle(m));
        }

        public void Handle(DrawChessBoardMessage message)
        {
            Console.Clear();

            var chessBoard = message.ChessBoard;

            // ...
        }

Although you technically could do this from the ChessGameActor itself, I consider it good practice to separate state/logic from presentation. Reminiscent of the MVC pattern, this makes it easy to support various output devices (e.g. GUI window, web, mobile, etc) without having to change the core of your game.

The DrawChessBoardMessage is simply a copy of the chessboard:

    public class DrawChessBoardMessage
    {
        public char[][] ChessBoard { get; }

        public DrawChessBoardMessage(char[][] chessBoard)
        {
            this.ChessBoard = chessBoard;
        }
    }

Although we could micro-optimise this by sending a diff instead (i.e. old position to erase, and new position to draw) as we do in the Akka.Remote multiplayer game example, the data here is so small as to carry negligible overhead. Besides, it’s common practice in games to just redraw everything (which may not be the fastest approach, but complex environments make tracking changes impossible).

The main program is responsible for creating the actor system, along with these two actors:

        static void Main(string[] args)
        {
            Console.Title = "Akka .NET Persistence Chess Example";

            using (var actorSystem = ActorSystem.Create("Chess"))
            {
                var drawingProps = Props.Create<ChessBoardDrawingActor>();
                var drawingActor = actorSystem.ActorOf(drawingProps, "DrawingActor");

                Guid gameId = Guid.Parse("F56079D3-4625-409A-B734-C9BDEBA6D7FA");
                var gameProps = Props.Create<ChessGameActor>(gameId, drawingActor);
                var gameActor = actorSystem.ActorOf(gameProps, "GameActor");

                HandleInput(gameActor);

                Console.ReadLine();
            }
        }

The input handling logic expects to receives moves in the format move <from> to <to>; once it extracts the from and to locations, it sends a MoveMessage to the ChessGameActor.

        static void HandleInput(IActorRef chessGameActor)
        {
            string input = string.Empty;

            while (input != null) // quit on Ctrl+Z
            {
                input = Console.ReadLine();

                var tokens = input.Split();

                switch (tokens[0]) // check first word
                {
                    case "move": // e.g. move e2 to e4
                        {
                            string from = tokens[1];
                            string to = tokens[3];
                            var message = new MoveMessage(from, to);

                            chessGameActor.Tell(message);
                        }
                        break;
                    default:
                        Console.WriteLine("Invalid command.");
                        break;
                }
            }
        }

In fact, a MoveMessage is simply a combination of from and to locations:

    public class MoveMessage
    {
        public string From { get; }
        public string To { get; }

        public MoveMessage(string from, string to)
        {
            this.From = from;
            this.To = to;
        }

        public override string ToString()
        {
            return $"move {this.From} to {this.To}";
        }
    }

However, these locations are still in the format entered by the user (e.g. e4). When the GameActor receives a MoveMessage, it must first translate the locations into indices in the 2-dimensional array that we’re using as a chess board. This is done in a method called TranslateMove() which does some funky ASCII manipulation…

        private Point TranslateMove(string move)
        {
            // e.g. e4: e is the column, and 4 is the row

            char colChar = move[0];
            char rowChar = move[1];

            int col = colChar - 97;
            int row = 8 - (rowChar - '0');

            return new Point(col, row);
        }

…and returns an instance of a Point class. Point is your typical 2D coordinate.

    public class Point
    {
        public int X { get; }
        public int Y { get; }

        public Point(int x, int y)
        {
            this.X = x;
            this.Y = y;
        }
    }

Once the GameActor translates these coordinates, it can update the state of the chess board, and send a DrawChessBoardMessage to the ChessBoardDrawingActor to redraw the chess board.

        public void Handle(MoveMessage message)
        {
            var fromPoint = this.TranslateMove(message.From);
            var toPoint = this.TranslateMove(message.To);

            char piece = this.chessBoard[fromPoint.Y][fromPoint.X];

            chessBoard[fromPoint.Y][fromPoint.X] = ' '; // erase old location
            chessBoard[toPoint.Y][toPoint.X] = piece; // set new location

            this.RedrawBoard();
        }

        private void RedrawBoard()
        {
            var drawMessage = new DrawChessBoardMessage(this.chessBoard);
            this.rendererActor.Tell(drawMessage);
        }

Saving Messages using Akka.Persistence Journaling

In order to be able to recover our actor’s state (in this case, replay chess games one move at a time), we need to store those MoveMessages as they arrive in our ChessGameActor. We can do this using the built-in functionality of Akka.Persistence.

The first thing we need to do is have our ChessGameActor inherit from ReceivePersistentActor (instead of ReceiveActor):

public class ChessGameActor : ReceivePersistentActor

When we do this, we will be required to provide a property called PersistenceId. Fortunately, we’re passing in a Guid called gameId to our actor, so we can use that:

        public override string PersistenceId
        {
            get
            {
                return this.gameId.ToString("N");
            }
        }

        public ChessGameActor(Guid gameId, IActorRef rendererActor)
        {
            this.gameId = gameId;
            // ...
        }

We’ll see what this is for in a minute. Let’s complete our constructor:

        public ChessGameActor(Guid gameId, IActorRef rendererActor)
        {
            this.gameId = gameId;
            this.rendererActor = rendererActor;

            this.RedrawBoard();

            this.Command<MoveMessage>(PersistAndHandle, null);
        }

In the constructor, we store the game ID and a reference to the ChessBoardDrawingActor. We draw the initial board (before anyone has moved), and then we set up our message handling.

In a ReceivePersistentActor, we use Command<T>() instead of Receive<T>() to set up our message handlers. We can’t use Receive<T>() because of some ambiguity between base class methods. The null value passed in is similarly to prevent ambiguities between overloads.

In the PersistAndHandle() method, we call the built-in Persist() method to save the message and call a handling method after the save is successful:

        public void PersistAndHandle(MoveMessage message)
        {
            Persist(message, persistedMessage => Handle(persistedMessage));
        }

The Handle() method is the same one we’ve seen before that handles the MoveMessage.

We could have done all this in one step within the Command<T>() call, as you can see in Petabridge’s Akka.Persistence blog article. However, I’m not a big fan of doing a lot of logic in nested lambdas, as they can quickly get out of hand for non-trivial scenarios.

Now we just need a little configuration to tell Akka.Persistence where it should store the messages whenever we call Persist():

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <configSections>
    <section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka" />
  </configSections>
  
  <startup> 
      <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6.1" />
  </startup>
    
  <akka>
    <hocon>
      <![CDATA[
      akka.persistence
      {
        journal
        {
          plugin = "akka.persistence.journal.sql-server"
          sql-server
          {
              class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
              schema-name = dbo
              auto-initialize = on
              connection-string = "Data Source=.\\SQLEXPRESS;Database=AkkaPersistenceChess;Integrated Security=True"
          }
        }
      }
      ]]>
    </hocon>
  </akka>
</configuration>

Let’s run the program and enter a few moves that we can later replay. These are the moves for fool’s mate:

move f2 to f3
move e7 to e5
move g2 to g4
move d8 to h4

Here is what it looks like, just before the last move:

akka-persistence-chess-output

If you check your database now, you’ll see a couple of tables. The EventJournal one has an entry for each move you played (i.e. for each message that was handled):

akka-persitence-chess-journal-table

The values in the PersistenceId column match the game ID, which is what we provided in the ChessGameActor’s PersistenceId property. If we wanted to save the progress of a different game, we would pass in a different game ID (and thus PersistenceId) to our ChessGameActor.

Each PersistenceId should be unique across the across the actor system, and there should be only one instance of a persistence actor with that PersistenceId running at any given time. Doing otherwise would compromise the state saved in the database.

Recovering State

In our ChessGameActor’s constructor, we can use the Recover<T>() method to replay messages from the persistence store and recover our state (i.e. do event sourcing), before we begin receiving new messages.

        public ChessGameActor(Guid gameId, IActorRef rendererActor)
        {
            this.gameId = gameId;
            this.rendererActor = rendererActor;

            this.RedrawBoard();

            this.Recover<MoveMessage>(RecoverAndHandle, null);
            this.Command<MoveMessage>(PersistAndHandle, null);
        }

In this case, we’ll handle the recovered messages as normal, but we will also introduce an artificial delay so that the player can actually watch each move being replayed.

        public void RecoverAndHandle(MoveMessage message)
        {
            Handle(message);
            Thread.Sleep(2000);
        }

If we run the game again now, we can watch the moves being replayed, and then we can continue playing where we left off.

akka-persistence-chess-replay-output

Summary

In this article, we have learned how Akka.Persistence supports event sourcing. This is done as follows:

  1. Actors wanting to save messages should inherit from ReceivePersistentActor.
  2. They must supply a PersistenceId which is unique and which will be used to associate saved messages with this particular actor’s state (or that of any subsequent incarnations).
  3. Use Command<T>() instead of Receive<T>() for message handling.
  4. Use Persist() to save messages before handling them.
  5. Use Recover() to replay messages until the actor’s last state is restored.

The particular approach we have seen is called journaling, and it is only one feature of Akka.Persistence. This may be enough for chess games that typically last for not more than 30-40 moves. But in many other use cases with large data flows, the journal may grow a lot and it can take a while to restore state. Akka.Persistence supports a snapshot feature to help mitigate this problem.

Asynchronous and Concurrent Processing in Akka .NET Actors

Yesterday, I wrote a long article about asynchronous processing in Akka .NET actors, the dangers of async void methods, and PipeTo().

The article was written as a result of outdated Akka .NET documentation which claimed that async/await was evil in Akka .NET actors and recommended using PipeTo() instead (the documentation has since been updated). This was further exacerbated by the confusion between asynchronous and concurrent processing in the documentation.

I would like to thank Aaron Stannard and Roger Alsing from the Akka .NET team who, via separate channels, clarified a lot of the confusion. This new article covers asynchronous and concurrent processing in Akka .NET actors as a result.

Asynchronous vs Concurrent Processing

A lot of people confuse asynchronous and concurrent processing, more so given that you can do both with .NET Tasks.

This is asynchronous:

        static async Task RunAsync()
        {
            await Task.Delay(1000);
        }

RunAsync begins executing, but is suspended waiting for some external operation to occur (typically I/O such as reading from a database or a REST service). It can resume execution when the operation completes, but can’t continue doing other things in the meantime.

This, on the other hand, is concurrent:

        static async Task RunAsync()
        {
            Task.Delay(1000);
        }

Because the task is not awaited, the method proceeds with its execution while the task is running. In fact, we get a nice big warning when we do this in a method marked as async:

akkanet-async-not-awaited

If we extend our example just a little bit, we can understand the behaviour better:

        static Task RunAsync()
        {
            Task.Delay(1000)
                .ContinueWith(x => Console.WriteLine("After Delay"));
            Console.WriteLine("End of RunAsync()");
        }

The output is as follows:

End of RunAsync()
After Delay

Now, I could go on about definitions of synchrony and asynchrony, concurrency and parallelism, interleaving, and task-switching. But that would bore you to tears, and it’s really not the point here.

The important thing is to realise that despite using very similar C# syntax, we’re doing two very different things here. And I need to make this clear because PipeTo() is really targeted at concurrent processing, although it is described within the context of asynchrony in Akka .NET documentation.

async/await in actors

You can do async/await in an actor by using the ReceiveActor‘s ReceiveAsync() method:

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.ReceiveAsync<string>(HandleAsync);
        }

        public async Task HandleAsync(string str)
        {
            Console.WriteLine($"Begin {str}");

            await Task.Delay(2000);

            Console.WriteLine($"End {str}");
        }
    }

This is perfectly valid, and you will most likely resort to this when you need a guarantee on message order. In such situations, you don’t want to start processing the next message while waiting for an I/O operation to complete, as it is possible for the results of an older message to overwrite those of a newer message.

It is also very useful for sequential steps in I/O operations that depend directly on each other, such as when talking to a database:

        public async Task HandleAsync(string str)
        {
            const string connStr = @"Data Source=.\SQLEXPRESS;Database=test;Integrated Security=true";
            using (var conn = new SqlConnection(connStr))
            {
                await conn.OpenAsync();

                const string sql = "select * from person;";

                using (var command = new SqlCommand(sql, conn))
                using (var reader = await command.ExecuteReaderAsync())
                {
                    while (await reader.ReadAsync())
                    {
                        string id = reader["id"].ToString();
                        string name = reader["name"].ToString();

                        Console.WriteLine($"{id} {name}");
                    }
                }
            }
        }

However, this comes at a cost. First, there is a performance impact when compared to synchronous execution, because the Akka .NET pipeline needs to carry the current context across asynchronous steps.

Secondly, the actor will not be able to process the next message until the current one is finished. Sometimes, this is exactly what you want, such as when you need a guarantee on message order. In such situations, you don’t want to start processing the next message while waiting for an I/O operation to complete, as it is possible for the results of an older message to overwrite those of a newer message. But if your message processing does not depend on prior state, you will get much more throughput if you run tasks concurrently and use PipeTo() to collect the results (more on this later).

Using Ask()

Ask() lets you do request/response between actors:

    public class ServiceActor : ReceiveActor
    {
        public ServiceActor()
        {
            this.ReceiveAsync<string>(HandleAsync);
        }

        public async Task HandleAsync(string str)
        {
            await Task.Delay(2000);
            Sender.Tell(str + " done");
        }
    }

    public class MyActor : ReceiveActor
    {
        private IActorRef serviceActor;

        public MyActor(IActorRef serviceActor)
        {
            this.serviceActor = serviceActor;

            this.ReceiveAsync<string>(HandleAsync);
        }

        public async Task HandleAsync(string str)
        {
            Console.WriteLine($"Begin {str}");

            var result = await this.serviceActor.Ask(str);

            Console.WriteLine($"End {result}");
        }
    }

Here is the output for this:

akkanet-async-ask

The approach above is something you typically want to avoid, for the same reason outlined in the previous section. If your actor is waiting for a response, it can’t process other messages in the meantime. Most of the time you should spawn a concurrent task as shown in the documentation, unless you have a good reason for not wanting to process the next message before the current one has finished. Try to design your system in a push fashion, rather than request/response.

Concurrent Execution and PipeTo()

If you have no reason to process messages in a strictly sequential manner, then you can do long-running tasks and I/O operations in a spawned task.

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.Receive<string>(x => Handle(x));
        }

        public void Handle(string str)
        {
            Task.Run(async () =>
            {
                Console.WriteLine($"Begin {str}");

                await Task.Delay(2000);

                Console.WriteLine($"End {str}");
            });
        }
    }

Because of this, the actual processing you do within the task will be interleaved, as in yesterday’s article:

akkanet-async-concurrent

But this is okay, because we’re working on the assumption that the messages don’t need to be processed strictly in sequence.

Now if you want to send the result of your concurrent tasks somewhere, you can do that with PipeTo():

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.Receive<string>(x => Handle(x));
            this.Receive<int>(x => Handle(x));
        }

        public void Handle(string str)
        {
            Task.Run(async () =>
            {
                Console.WriteLine($"Begin {str}");

                await Task.Delay(2000);

                Console.WriteLine($"End {str}");

                return 42;
            }).PipeTo(Self);
        }

        public void Handle(int result)
        {
            Console.WriteLine($"Got result: {result}");
        }
    }

The result of the concurrent operation is sent to the actor you specify (in this case to itself) and processed as any other message in its mailbox. You can also do post-processing (e.g. check HTTP status code after an HTTP GET operation) by adding a ContinueWith(); see the PipeTo() article on the Petabridge blog for an example.

More Advanced Concurrent Operations

Given that you can use both tasks and async/await in your actors, you can use all of the typical patterns you would normally use with the Task Parallel Library (TPL).

Here’s an example representing when you’d aggregate data from multiple external resources:

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.ReceiveAsync<string>(x => HandleAsync(x));
        }

        public async Task HandleAsync(string str)
        {
            var task1 = Task.Delay(1000).ContinueWith(x => { return 1; });
            var task2 = Task.Delay(2000).ContinueWith(x => { return 2; });
            var task3 = Task.Delay(3000).ContinueWith(x => { return 3; });

            var results = await Task.WhenAll<int>(task1, task2, task3);
            var sum = results.Sum();
            Console.WriteLine(sum);
        }
    }

WhenAll() will wait for all the tasks to complete before the method can proceed with its execution. Here’s the output:

akkanet-async-whenall

Here’s another example which takes the result of whichever task completes first:

        public async Task HandleAsync(string str)
        {
            var task1 = Task.Delay(1000).ContinueWith(x => { return 1; });
            var task2 = Task.Delay(2000).ContinueWith(x => { return 2; });
            var task3 = Task.Delay(3000).ContinueWith(x => { return 3; });

            var result = await await Task.WhenAny<int>(task1, task2, task3);
            Console.WriteLine(result);
        }

In this example, WhenAny() suspends execution of the method until any of the tasks completes. The result from the fastest task is taken.

akkanet-async-whenany

Note: if you’re looking to do this kind of concurrent fastest-query operation, you might want to look at Akka .NET Routers with Routing Strategies such as ScatterGatherFirstCompleted.