Tag Archives: Akka .NET

Child Per Entity Pattern in Akka .NET

In Akka .NET, actors are very lightweight, and you’re encouraged to use lots of them. It’s thus very common to have a type of entity (e.g. stock symbol, event, session, etc), and an actor dedicated to each instance of that entity.

The source code for this article is available at the Gigi Labs BitBucket Repository. The project is called Entity Per Child (instead of Child Per Entity), because somehow it stuck in my head that way!

Let’s take a practical example. You have an application which receives updates for various different stocks listed on the stock exchange; each of these is represented by a symbol (e.g. MSFT represents Microsoft):

    public class StockUpdate
    {
        public string Symbol { get; }
        public decimal Price { get; }

        public StockUpdate(string symbol, decimal price)
        {
            this.Symbol = symbol;
            this.Price = price;
        }
    }

Updates are sent to a StockCoordinatorActor (for this example we’ll simulate this by just having the ActorSystem itself send these messages):

            using (var actorSystem = ActorSystem.Create("StockActorSystem"))
            {
                var props = Props.Create<StockCoordinatorActor>();
                var coord = actorSystem.ActorOf(props, "StockCoordinatorActor");

                coord.Tell(new StockUpdate("ABC", 1.20m));
                coord.Tell(new StockUpdate("XYZ", 0.59m));
                coord.Tell(new StockUpdate("ABC", 1.21m));
                coord.Tell(new StockUpdate("HBZ", 0.86m));
                coord.Tell(new StockUpdate("FUK", 1.20m));
                coord.Tell(new StockUpdate("XYZ", 0.57m));

                Console.ReadLine();
            }

Now, the StockCoordinatorActor will be responsible for spawning a child actor for each symbol, and directing messages concerning that symbol to that child actor. I see a lot of questions about how to store this mapping in a dictionary, but actually, you don’t need to. Actors can easily access information about their children, so you should use that to your advantage:

    public class StockCoordinatorActor : ReceiveActor
    {
        public StockCoordinatorActor()
        {
            this.Receive<StockUpdate>(Handle, null);
        }

        private void Handle(StockUpdate update)
        {
            var childName = update.Symbol;

            // check if a child with that name exists
            var child = Context.Child(childName);

            // if it doesn't exist, create it
            if (child == ActorRefs.Nobody)
            {
                var props = Props.Create(() => new StockActor(childName));
                child = Context.ActorOf(props, childName);
            }

            // forward the message to the child actor
            child.Tell(update.Price);
        }
    }

The child actor that handles the message will simply write out the price update in this example:

    public class StockActor : ReceiveActor
    {
        private string symbol;

        public StockActor(string symbol)
        {
            this.symbol = symbol;

            this.Receive<decimal>(Handle, null);
        }

        private void Handle(decimal price)
        {
            Console.WriteLine($"{Context.Self.Path} - {this.symbol}: {price} ");
        }
    }

Let’s run this:

akkanet-child-per-entity

I’m writing out the path of the actor that handles the update, to show that these are actually children of the StockCoordinatorActor.

If you’re dealing with a large number of actors, you might want to use a ReceiveTimeout to kill off the child actors after they’ve been idle for a period of time, to keep memory usage within reason. If a new message eventually comes in, the actor will be recreated by the same child creation logic.

This approach is called the Child Per Entity pattern. It’s actually very similar to using a consistent hashing router to assign work to a pool of dedicated actors based on an ID. With Child Per Entity, you have exactly one actor per ID, while with consistent hashing, you get actors handling multiple IDs. Due to the need to map this level of indirection, Child Per Entity is simpler to use for stateful actors.

On Akka .NET Actor Creation

Abstract: this article is a bit of a rant about how actors are created in Akka .NET, and suggests a way of making it just a little bit more manageable. This is written based on my limited knowledge of Akka .NET, and I will be more than happy to stand corrected on the matters I write about.

Actors Need Props

Creating an ActorSystem and actors in Akka .NET is one of the most basic necessities, and it is something you’ll be doing all the time. Once you have an ActorSystem, you can create actors using an IActorRefFactory implementation (i.e. the ActorSystem itself, or an actor’s Context). This requires you to use Props.

            using (var actorSystem = ActorSystem.Create("MyActorSystem"))
            {
                var props = Props.Create<MyActor>();
                var actor = actorSystem.ActorOf(props);

                Console.ReadLine();
            }

What the hell are Props?

This is explained by Unit 1 Lesson 3 of the Petabridge Akka .NET Bootcamp. Basically, the answer is something like: it’s a recipe for creating actors, but don’t worry about it for now; we’ll use it later.

As a matter of fact, it’s needed mainly for advanced scenarios such as remote deployment and clustering. Most of the time as you’re learning to use Akka .NET, you don’t really care about them.

Creating Props

There are three ways to create Props, all involving some manner of a call to Props.Create().

The first way is to give it a Type.

var props = Props.Create(typeof(MyActor));

This is discouraged by Akka .NET, because it has no type safety and in fact lets you do stupid things like this:

akka-actorcreaton-typeof

The second way is to use a generic form:

var props = Props.Create<MyActor>();

While this is encouraged in the bootcamp, I personally discourage this. This is because while it gives you type safety over the actor type, it doesn’t give you any guarantees with the parameters:

akkanet-props-generic-2

The third way is to pass in a factory method. This is a great way to create Props because it’s the only one that lets you pass dependencies into the actor’s constructor in a type-safe manner (particularly important to use constructor injection if you’re thinking of writing tests against your actors).

var props = Props.Create(() => new MyActor());

Making It Better

In reality, most of the time I don’t care about Props. So why do I have to constantly bother about them? Actually, if we take a closer look at the third way of creating Props, we can wrap them into oblivion:

akkanet-props-expression

See that generic Expression over there? That’s what we need to avoid all this madness. Based on that, we can create a generic method to take care of everything:

    public static class IActorRefFactoryExtensionscs
    {
        /// <summary>
        /// Creates an actor, creating props based on the provided
        /// actor factory method.
        /// </summary>
        /// <typeparam name="T">The actor type.</typeparam>
        /// <param name="actorRefFactory">ActorSystem or actor Context.</param>
        /// <param name="actorFactory">Actor factory method.</param>
        public static IActorRef CreateActor<T>(this IActorRefFactory actorRefFactory,
            Expression<Func<T>> actorFactory) where T : ActorBase
        {
            var props = Props.Create(actorFactory);
            var actor = actorRefFactory.ActorOf(props);
            return actor;
        }
    }

To create an actor directly from the ActorSystem, we now only need to do this:

var actor = actorSystem.CreateActor(() => new MyActor());

…and to do it from inside an actor, it’s just as easy:

var actor = Context.CreateActor(() => new MyActor());

Weird Conventions

This is where the rant begins.

Apart from all the weirdness associated with having to deal with Props in the first place, Unit 1 Lesson 3 of the Petabridge Akka .NET Bootcamp has this little gem that makes my day brighter every time I read it:

How do I make Props?

“Before we tell you how to make Props, let me tell you what NOT to do.

DO NOT TRY TO MAKE PROPS BY CALLING new Props(...). Similar to trying to make an actor by calling new MyActorClass(), this is fighting the framework and not letting Akka’s ActorSystem do its work under the hood to provide safe guarantees about actor restarts and lifecycle management.”

Here’s a similar gem from Unit 1 Lesson 1:

NOTE: When creating Props, ActorSystem, or ActorRef you will very rarely see the new keyword. These objects must be created through the factory methods built into Akka.NET. If you’re using new you might be making a mistake.”

Wait, what? These guys are telling us to call static Create() methods rather than using constructors. These are the same people who told us that using async/await in actors is bad (which has since been corrected). I don’t know, but I bet if you ask anyone who has done OOP before, they’ll tell you that if there’s a mistake, it’s in Akka .NET’s design.

But to really top it all, check out the following comment from Aaron Standard (Petabridge CTO and Akka .NET co-founder) on the Reddit thread about one of his articles (emphasis mine):

Orleans is a piss-poor implementation of the Actor model and breaks a lot of the conventions that make it powerful, aside from having an overall hideous set of programming conventions. But because MSFT released it, people will flock to it like lemmings.

“We’re going to keep working on Akka.NET because there’s a community supporting it now and because we believe in TypeSafe’s vision for actor systems.”

A case of the pot calling the kettle black? Or quoting Confucius, “Don’t complain about the snow on your neighbor’s roof when your own doorstep is unclean.”

In any case, my goal is not to start a flame war but to understand why the Akka .NET API (and codebase) is such a mess. If you look at the source code for ActorSystem, for instance, it does not do anything so particularly complicated that would justify banning constructor calls. In fact, the call to ActorSystem.Create() ends up here:

        private static ActorSystem CreateAndStartSystem(string name, Config withFallback)
        {
            var system = new ActorSystemImpl(name, withFallback);
            system.Start();
            return system;
        }

In fact, although you shouldn’t do this, this code works just as well as what we had before:

            using (var actorSystem = new ActorSystemImpl("MyActorSystem"))
            {
                actorSystem.Start();

                var actor = actorSystem.CreateActor(() => new MyActor());

                Console.ReadLine();
            }

Why is this even public such that I can call it?

Conclusion

The API provided by Akka .NET, particular in ActorSystem and actor creation, is very strange indeed. We are discouraged to do something as trivial a calling a constructor, and have to deal with Props even though we won’t need them most of the time. It is hard to speculate on why this API was written this way without having the developers provide insight on it.

At the very least, aside from pointing out these obvious flaws, this article aims to suggest best practices on how to go about creating Props, and to provide an extension method to hide the existence of Props for the majority of cases where using them directly isn’t really necessary.

Using ReceiveTimeout in Akka .NET

In Akka .NET, you can opt to receive a special message when an actor is idle (i.e. does not receive any message) for a certain period of time. This message is the ReceiveTimeout, and is particularly useful to clean up idle actors when using the Entity Per Child pattern (although various other scenarios exist where this might be used).

The source code for this article is available at the Gigi Labs BitBucket repository.

Let’s consider an ActorSystem with a single actor. The ActorSystem sends a message to this actor every few seconds, using the built-in Akka .NET Scheduler:

            using (var actorSystem = ActorSystem.Create("MyActorSystem"))
            {
                var actor = actorSystem.ActorOf(
                    Props.Create<SomeActor>(), "SomeActor");

                actorSystem.Scheduler.ScheduleTellRepeatedly(
                    initialDelay: TimeSpan.FromSeconds(1),
                    interval: TimeSpan.FromSeconds(5),
                    receiver: actor,
                    message: "Hello!",
                    sender: ActorRefs.NoSender
                );

                Console.ReadLine();
            }

In the actor itself, we call SetReceiveTimeout() in order to be able to receive a ReceiveTimeout message after the specified idle period elapses. Then, we handle the ReceiveTimeout message just like we would handle any other message.

    public class SomeActor : ReceiveActor
    {
        public SomeActor()
        {
            this.Receive<string>(Handle, null);
            this.Receive<ReceiveTimeout>(Handle, null);

            var timeout = TimeSpan.FromSeconds(3);
            this.SetReceiveTimeout(timeout);
        }

        private void Handle(string msg)
        {
            Console.WriteLine(msg);
        }

        private void Handle(ReceiveTimeout msg)
        {
            Console.WriteLine("Timeout received!");
        }
    }

If we run the application with the code we have so far, we get this:

akkanet-receivetimeout-nokill

Although we are handling the ReceiveTimeout message, we aren’t really doing anything to react to it. As the ReceiveTimeout documentation states, the ReceiveTimeout will keep firing after periods of inactivity.

To disable the ReceiveTimeout altogether, just call SetReceiveTimeout() again, passing in null:

this.SetReceiveTimeout(timeout);

You will often want to kill the actor altogether. In particular when using the Entity Per Child pattern, you will want to kill off actors that aren’t in use to keep your memory footprint low; and since you’ll have a parent actor taking care of routing messages and creating child actors as needed, any new message to that actor will cause it to be recreated.

There are different ways to kill an actor, so just pick one and use it in your ReceiveTimeout handler:

        private void Handle(ReceiveTimeout msg)
        {
            Console.WriteLine("Timeout received!");

            Context.Stop(Context.Self); // stop the actor
        }

Since we’re killing the actor now, any subsequent messages become undeliverable:

akkanet-receivetimeout-kill

Custom Loggers in Akka .NET

Akka .NET supports a flexible logging mechanism that can adapt with various logging providers, as we have seen in my earlier article on logging with Akka .NET. Aside from the default logger that writes to the console, you can plug in various loggers of your own choosing (e.g. NLog), set them up in configuration, and work with them using a common interface.

Sometimes, you may have specific logging requirements that are not covered by any of the existing logging plugins for Akka .NET. In such cases, you would need to write your own custom logger. Unfortunately, the Akka .NET Logging documentation does not explain how to do this at the time of writing this article.

This article is intended to fill this gap, explaining how to write a custom logger for Akka .NET, but also touching upon various topics such as reading custom configuration, actor lifecycle hooks, and cleanup of resources used by the ActorSystem. The source code for this article is available at the Gigi Labs BitBucket repository.

In a Nutshell

Akka .NET Logging is basically a port of Akka logging. Any logger is an actor that receives the following messages: Debug, Info, Warning, Error and InitializeLogger. These are all standard Akka .NET messages, and we will see how to use them in a minute. At the time of writing this article, I’ve come across the following loggers that one can refer to in order to see how custom loggers are built:

Main Program

For the sake of this article, we can go along with the following simple program:

            using (var actorSystem = ActorSystem.Create("MyActorSystem"))
            {
                var logger = Logging.GetLogger(actorSystem, actorSystem, null);
                logger.Info("ActorSystem created!");

                Console.WriteLine("Press ENTER to exit...");
                Console.ReadLine();
            }

In my earlier Akka .NET logging article, we had done logging only from within actors. The above code shows how you can use the same configured logger directly from the ActorSystem.

We are now going to need a little configuration.

<?xml version="1.0" encoding="utf-8" ?>
<configuration>

  <configSections>

<section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka" />
  </configSections>

  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6.1" />
  </startup>

  <akka>
    <hocon>
      <![CDATA[ akka { loglevel = DEBUG loggers = ["AkkaNetCustomLogger.Loggers.ConsoleLogger, AkkaNetCustomLogger"] actor { debug { receive = on # log any received message autoreceive = on # log automatically received messages, e.g. PoisonPill lifecycle = on # log actor lifecycle changes event-stream = on # log subscription changes for Akka.NET event stream unhandled = on # log unhandled messages sent to actors } } } ]]>
    </hocon>
  </akka>

</configuration>

Here, we’re turning on all internal Akka .NET logging so that we can automatically get some logging output.

The important thing here is the loggers configuration, where we’re specifying the custom logger that we want Akka .NET to use. This is exactly how we had set up Akka .NET to use NLog in my earlier article, but this time we’re going to use a class called ConsoleLogger (which we have yet to create).

Writing the ConsoleLogger

As I mentioned earlier, any custom logger needs to handle five messages: Debug, Info, Warning, Error, and InitializeLogger. That’s the first thing we’ll set up our ConsoleLogger to do.

    public class ConsoleLogger : ReceiveActor
    {
        public ConsoleLogger()
        {
            Receive<Debug>(e => this.Log(LogLevel.DebugLevel, e.ToString()));
            Receive<Info>(e => this.Log(LogLevel.InfoLevel, e.ToString()));
            Receive<Warning>(e => this.Log(LogLevel.WarningLevel, e.ToString()));
            Receive<Error>(e => this.Log(LogLevel.ErrorLevel, e.ToString()));
            Receive<InitializeLogger>(_ => this.Init(Sender));
        }

        // ...
    }

The actual logging messages will use a common Log() helper method, since they differ only in log level (the operation of writing to the log destination is the same for all). Note that when we convert each of these classes to string, that includes the log level, so we don’t need to write it separately. In the case of the ConsoleLogger, we are passing in the LogLevel merely so we can use a different colour for each level.

The special message we haven’t covered yet is InitializeLogger. When the ActorSystem creates the logger actor, the internal event bus needs to know whether the logger is ready to start accepting messages. It does this by sending the logger actor an InitializeLogger message, and expects a LoggerInitialized message in return:

        private void Init(IActorRef sender)
        {
            using (var consoleColour = new ScopedConsoleColour(ConsoleColor.Green))
                Console.WriteLine("Init");

            sender.Tell(new LoggerInitialized());
        }

Aside from sending back the required message, I’m also logging the init operation itself, so that we can later observe the sequence of events. I am using my trusty ScopedConsoleColour class to change the colour, and then reset it back after the message has been written.

If you don’t send the LoggerInitialized message back, the actor system reports a timeout from initialising the logger, and basically you get no logging. (Well, ironically, the timeout itself is logged… presumably using the DefaultLogger as a fallback.)

akkanet-logging-notinitialized

Now we can implement our Log() helper method:

        private void Log(LogLevel level, string message)
        {
            ConsoleColor colour = ConsoleColor.Gray;

            switch (level)
            {
                case LogLevel.DebugLevel:
                    colour = ConsoleColor.Gray;
                    break;
                case LogLevel.InfoLevel:
                    colour = ConsoleColor.White;
                    break;
                case LogLevel.WarningLevel:
                    colour = ConsoleColor.Yellow;
                    break;
                case LogLevel.ErrorLevel:
                    colour = ConsoleColor.Red;
                    break;
                default: // shouldn't happen
                    goto case LogLevel.InfoLevel;
            }

            using (var consoleColour = new ScopedConsoleColour(colour))
                Console.WriteLine(message);
        }

Here we’re doing nothing but using a different colour per level, and writing the message to the console (which is pretty much what StandardOutLogger does). Remember, the level is already part of the message, so we don’t need to format it into the output message. (And if you’re outraged at the use of goto above, I suggest you read about goto case in C#.)

Actor Life Cycle Hooks

If your custom logger depends on external resources (which is most likely the case) such as the filesystem or a database, you will want to initialise those resources when the logger actor is created, and clean them up when it is destroyed. That work typically goes into actor life cycle hooks, i.e. overridable methods that allow you to run arbitrary code when an actor starts, stops, or restarts.

We don’t need to do this for ConsoleLogger, so we will simply log the start and stop operation instead. However, we will use these hooks more realistically when we implement the FileLogger.

        protected override void PreStart()
        {
            base.PreStart();

            using (var consoleColour = new ScopedConsoleColour(ConsoleColor.Green))
                Console.WriteLine("PreStart");
        }

        protected override void PostStop()
        {
            using (var consoleColour = new ScopedConsoleColour(ConsoleColor.Green))
                Console.WriteLine("PostStop");

            base.PostStop();
        }

We can now run this and see the logging in action:

akkanet-logging-console

Now, for something interesting, put a breakpoint inside PostStop(), and press ENTER to cause the program to continue and terminate. One would expect PostStop() to run as the ActorSystem is shutting down. But in fact, it doesn’t.

Next, go back to the main program, and add a second Console.ReadLine() at the end:

            using (var actorSystem = ActorSystem.Create("MyActorSystem"))
            {
                var logger = Logging.GetLogger(actorSystem, actorSystem, null);
                logger.Info("ActorSystem created!");

                Console.WriteLine("Press ENTER to exit...");
                Console.ReadLine();
            }

            Console.ReadLine();

Run it again, and when you press ENTER, the breakpoint is hit and the PostStop event is written to the console while waiting for the second ENTER:

akkanet-logging-poststop

When we disposed the ActorSystem earlier, the program terminated before the ActorSystem had the chance to do its cleanup work. It appears that when the ActorSystem shuts down, it doesn’t clean resources right away; most likely it is done using asynchronous messaging just like in the rest of Akka .NET. For this reason, in your program’s stopping code, you might want to wait a little bit between destroying the ActorSystem and actually letting the application terminate, in order to let it gracefully free its resources.

Writing the FileLogger

We will now write a second custom logger, this time one that writes to file.

First, change the HOCON configuration to use the new logger.

          loggers = ["AkkaNetCustomLogger.Loggers.FileLogger, AkkaNetCustomLogger"]

Next, let’s write the FileLogger. As with the ConsoleLogger, we need to handle the same five messages:

    public class FileLogger : ReceiveActor
    {
        private StreamWriter writer;

        public FileLogger()
        {
            ReceiveAsync<Debug>(async e => await this.LogAsync(e.ToString()));
            ReceiveAsync<Info>(async e => await this.LogAsync(e.ToString()));
            ReceiveAsync<Warning>(async e => await this.LogAsync(e.ToString()));
            ReceiveAsync<Error>(async e => await this.LogAsync(e.ToString()));
            Receive<InitializeLogger>(_ => Sender.Tell(new LoggerInitialized()));
        }

        // ...
    }

The logger keeps a reference to a StreamWriter, which wraps the file we will be writing to.

The LogAsync() method simply dumps the messages into that StreamWriter (remember, streams and toilets must always be flushed):

        private async Task LogAsync(string message)
        {
            await this.writer.WriteLineAsync(message);
            await this.writer.FlushAsync();
        }

We can open the file itself either during the InitializeLogger handler or in PreStart(). Let’s use a fixed filename for now:

        protected override void PreStart()
        {
            base.PreStart();

            string filePath = "log.txt";
            var fileStream = File.OpenWrite(filePath);
            this.writer = new StreamWriter(fileStream);
        }

We can then do the cleanup in PostStop():

        protected override void PostStop()
        {
            // dispose the StreamWriter, and implicitly the
            // underlying FileStream with it
            this.writer.Dispose();

            base.PostStop();
        }

We only need to Dispose() our StreamWriter; doing that will automatically also close the underlying FileStream.

Now, while this is enough to log to file, there is a problem. We can’t actually use another program to read the log file while the program is running:

akkanet-logging-cantread

We can fix this by changing the way we open the file.

        protected override void PreStart()
        {
            base.PreStart();

            string filePath = "log.txt";
            var fileStream = File.Open(filePath, FileMode.Append, FileAccess.Write, FileShare.Read);
            this.writer = new StreamWriter(fileStream);
        }

You’ll see that the log messages are now written to file:

akkanet-logging-fileoutput

However, there is another problem. If you press ENTER to close the program, the following happens:

akkanet-logging-race

Upon further inspection, it seems that logging messages are coming in around the same time that PostStop() is running, causing a race condition on the underlying resource. I’ve opened a bug report for this, but until this is sorted, you can flush synchronously as a workaround:

        private async Task LogAsync(string message)
        {
            await this.writer.WriteLineAsync(message);
            this.writer.Flush();
        }

So, if there is this problem, how do existing logging adapters that have a file-based component (e.g. NLog) do their cleanup? Well, I’ve checked a few, and it seems they don’t.

Loading Custom Configuration

We’ve managed to write a file logger, but we’re using a fixed filename. How can we make it configurable?

It turns out we can just add an arbitrary setting anywhere in the HOCON configuration, and read it from inside the actor. So, let’s add this:

        akka
        {
          loglevel = DEBUG
          loggers = ["AkkaNetCustomLogger.Loggers.FileLogger, AkkaNetCustomLogger"]
          logfilepath = "logfile.txt"

We can get to the setting we want using the configuration system in Akka .NET:

        protected override void PreStart()
        {
            base.PreStart();

            string filePath = "log.txt";

            filePath = Context.System.Settings.Config
                .GetString("akka.logfilepath", filePath);

            var fileStream = File.Open(filePath, FileMode.Append, FileAccess.Write, FileShare.Read);
            this.writer = new StreamWriter(fileStream);
        }

Basically we’re reading the “akka.logfilepath” key from the HOCON config. We’re also passing in filePath as a default in case the setting is not found.

Running Multiple Loggers

So far we’ve been using either one logger or the other. But if you notice, the loggers configuration in HOCON is actually an array. Thus there is nothing stopping us from using multiple loggers at once:

        akka
        {
          loglevel = DEBUG
          loggers = ["AkkaNetCustomLogger.Loggers.FileLogger, AkkaNetCustomLogger",
                     "AkkaNetCustomLogger.Loggers.ConsoleLogger, AkkaNetCustomLogger"]
          logfilepath = "logfile.txt"

Yes, it works:

akkanet-logging-multiple

Akka .NET IActorRef: Local or Remote?

Akka .NET supports location transparency. When you use an IActorRef, your application shouldn’t care whether that actor is running on the same machine or somewhere else on the network. You can change where an actor runs as a matter of configuration, and your application will never know the difference.

Although an application shouldn’t depend on the physical location of an actor to perform its logic, knowing where an actor is running can be useful (e.g. when troubleshooting issues).

akkanet-islocal

There is an IsLocal property that you can use to tell whether an actor is running locally or remotely. However, this is not immediately accessible from the IActorRef. Instead, you need to cast your IActorRef to an InternalActorRefBase to be able to use it:

(localChatActor as InternalActorRefBase).IsLocal

If you’re working with an ActorSelection (which you probably are if you’re using remote actors), then you will first want to get to the particular IActorRef of the actor. You can do this via the ActorSelection‘s Anchor property.

(remoteChatActor.Anchor as InternalActorRefBase).IsLocal

This will allow you to check whether an actor is running locally or remotely. But remember: use this only for diagnostic purposes, and don’t make your application code dependent on it.

Replaying Chess Games using Akka.Persistence Event Sourcing

Introduction: Event Sourcing

In most modern games, it is conventional wisdom that you should save your progress regularly, lest you take a wrong turn and get mauled:

persistence-starcraft-save

Software is no different. If you’re running an actor system in memory, you risk losing state if anything happens to the actors or to the whole actor system.

It is thus important to save the state of your actors, but how?

In message-based systems such as Akka .NET, a popular approach towards recovery is to save messages as they arrive, and in case of failure, simply handle them again in the same order to restore the last state. This is known as event sourcing.

Akka .NET provides event sourcing support thanks to the Akka.Persistence module, as we shall see shortly.

The source code for this article is available at the Gigi Labs BitBucket repository.

Chess Scenario

Chess is a great example with which to demonstrate event sourcing because a chess game consists of a sequential set of moves which can be represented using a specific notation. We can save these moves and later replay the game.

It is also very easy to draw a chess board in a console application. It’s just an 8×8 grid with the pieces on it. We can represent the various chess pieces using different letters, and use uppercase and lowercase (instead of white and black) to distinguish between the two players’ pieces, as GNU Chess does:

gnuchess

Chess can get quite complex and I really don’t want to get lost in the details (since this article is about Akka.Persistence), so we’ll make a number of assumptions as follows to keep things simple:

  • No validation. Pieces can be moved anywhere on the board.
  • Both players use the same console window and take it in turns.
  • No game state (i.e. you can never win or lose).
  • Input will be of the format: move <from> to <to>, for example move e2 to e4.

In other words, we’re not really building a chess game. We’re just emulating the board and the movement so that we can store moves and replay them later.

Prerequisites

To follow along, install the following NuGet packages:

Install-Package Akka
Install-Package Akka.Persistence -pre
Install-Package Akka.Persistence.SqlServer -pre

Akka.Persistence seems to be prerelease at the time of writing this article, so you will need the -pre flag.

Akka.Persistence is very flexible in terms of where messages are saved. In this example we’re going to use SQL Server, but there are a whole load of storage implementations. Just look up “Akka.Persistence” in NuGet and you’ll see the available options:

akka.persistence-nuget

System Overview

akka-persistence-chess

We have a ChessGameActor that holds the game state (i.e. the chess board). I was originally going to use a string[] for this, but since we need to update individual characters, the immutable nature of strings becomes a problem. We need to use a 2-dimensional char array instead.

    public class ChessGameActor : ReceiveActor
    {
        private Guid gameId;
        private IActorRef rendererActor;

        private char[][] chessBoard = new char[][]
        {
            "rnbqkbnr".ToCharArray(),
            "pppppppp".ToCharArray(),
            "        ".ToCharArray(),
            "        ".ToCharArray(),
            "        ".ToCharArray(),
            "        ".ToCharArray(),
            "PPPPPPPP".ToCharArray(),
            "RNBQKBNR".ToCharArray()
        };

        // ...
    }

We also have a ChessBoardDrawingActor responsible for actually drawing the chess board. The ChessGameActor has a reference to it so that it can ask it to redraw the board when someone moves a piece.

The details of how ChessBoardDrawingActor is implemented are omitted for brevity (refer to the source code if you need it), but it basically just handles DrawChessBoardMessages coming from the ChessGameActor:

    public class ChessBoardDrawingActor : ReceiveActor
    {
        public ChessBoardDrawingActor()
        {
            this.Receive<DrawChessBoardMessage>(m => Handle(m));
        }

        public void Handle(DrawChessBoardMessage message)
        {
            Console.Clear();

            var chessBoard = message.ChessBoard;

            // ...
        }

Although you technically could do this from the ChessGameActor itself, I consider it good practice to separate state/logic from presentation. Reminiscent of the MVC pattern, this makes it easy to support various output devices (e.g. GUI window, web, mobile, etc) without having to change the core of your game.

The DrawChessBoardMessage is simply a copy of the chessboard:

    public class DrawChessBoardMessage
    {
        public char[][] ChessBoard { get; }

        public DrawChessBoardMessage(char[][] chessBoard)
        {
            this.ChessBoard = chessBoard;
        }
    }

Although we could micro-optimise this by sending a diff instead (i.e. old position to erase, and new position to draw) as we do in the Akka.Remote multiplayer game example, the data here is so small as to carry negligible overhead. Besides, it’s common practice in games to just redraw everything (which may not be the fastest approach, but complex environments make tracking changes impossible).

The main program is responsible for creating the actor system, along with these two actors:

        static void Main(string[] args)
        {
            Console.Title = "Akka .NET Persistence Chess Example";

            using (var actorSystem = ActorSystem.Create("Chess"))
            {
                var drawingProps = Props.Create<ChessBoardDrawingActor>();
                var drawingActor = actorSystem.ActorOf(drawingProps, "DrawingActor");

                Guid gameId = Guid.Parse("F56079D3-4625-409A-B734-C9BDEBA6D7FA");
                var gameProps = Props.Create<ChessGameActor>(gameId, drawingActor);
                var gameActor = actorSystem.ActorOf(gameProps, "GameActor");

                HandleInput(gameActor);

                Console.ReadLine();
            }
        }

The input handling logic expects to receives moves in the format move <from> to <to>; once it extracts the from and to locations, it sends a MoveMessage to the ChessGameActor.

        static void HandleInput(IActorRef chessGameActor)
        {
            string input = string.Empty;

            while (input != null) // quit on Ctrl+Z
            {
                input = Console.ReadLine();

                var tokens = input.Split();

                switch (tokens[0]) // check first word
                {
                    case "move": // e.g. move e2 to e4
                        {
                            string from = tokens[1];
                            string to = tokens[3];
                            var message = new MoveMessage(from, to);

                            chessGameActor.Tell(message);
                        }
                        break;
                    default:
                        Console.WriteLine("Invalid command.");
                        break;
                }
            }
        }

In fact, a MoveMessage is simply a combination of from and to locations:

    public class MoveMessage
    {
        public string From { get; }
        public string To { get; }

        public MoveMessage(string from, string to)
        {
            this.From = from;
            this.To = to;
        }

        public override string ToString()
        {
            return $"move {this.From} to {this.To}";
        }
    }

However, these locations are still in the format entered by the user (e.g. e4). When the GameActor receives a MoveMessage, it must first translate the locations into indices in the 2-dimensional array that we’re using as a chess board. This is done in a method called TranslateMove() which does some funky ASCII manipulation…

        private Point TranslateMove(string move)
        {
            // e.g. e4: e is the column, and 4 is the row

            char colChar = move[0];
            char rowChar = move[1];

            int col = colChar - 97;
            int row = 8 - (rowChar - '0');

            return new Point(col, row);
        }

…and returns an instance of a Point class. Point is your typical 2D coordinate.

    public class Point
    {
        public int X { get; }
        public int Y { get; }

        public Point(int x, int y)
        {
            this.X = x;
            this.Y = y;
        }
    }

Once the GameActor translates these coordinates, it can update the state of the chess board, and send a DrawChessBoardMessage to the ChessBoardDrawingActor to redraw the chess board.

        public void Handle(MoveMessage message)
        {
            var fromPoint = this.TranslateMove(message.From);
            var toPoint = this.TranslateMove(message.To);

            char piece = this.chessBoard[fromPoint.Y][fromPoint.X];

            chessBoard[fromPoint.Y][fromPoint.X] = ' '; // erase old location
            chessBoard[toPoint.Y][toPoint.X] = piece; // set new location

            this.RedrawBoard();
        }

        private void RedrawBoard()
        {
            var drawMessage = new DrawChessBoardMessage(this.chessBoard);
            this.rendererActor.Tell(drawMessage);
        }

Saving Messages using Akka.Persistence Journaling

In order to be able to recover our actor’s state (in this case, replay chess games one move at a time), we need to store those MoveMessages as they arrive in our ChessGameActor. We can do this using the built-in functionality of Akka.Persistence.

The first thing we need to do is have our ChessGameActor inherit from ReceivePersistentActor (instead of ReceiveActor):

public class ChessGameActor : ReceivePersistentActor

When we do this, we will be required to provide a property called PersistenceId. Fortunately, we’re passing in a Guid called gameId to our actor, so we can use that:

        public override string PersistenceId
        {
            get
            {
                return this.gameId.ToString("N");
            }
        }

        public ChessGameActor(Guid gameId, IActorRef rendererActor)
        {
            this.gameId = gameId;
            // ...
        }

We’ll see what this is for in a minute. Let’s complete our constructor:

        public ChessGameActor(Guid gameId, IActorRef rendererActor)
        {
            this.gameId = gameId;
            this.rendererActor = rendererActor;

            this.RedrawBoard();

            this.Command<MoveMessage>(PersistAndHandle, null);
        }

In the constructor, we store the game ID and a reference to the ChessBoardDrawingActor. We draw the initial board (before anyone has moved), and then we set up our message handling.

In a ReceivePersistentActor, we use Command<T>() instead of Receive<T>() to set up our message handlers. We can’t use Receive<T>() because of some ambiguity between base class methods. The null value passed in is similarly to prevent ambiguities between overloads.

In the PersistAndHandle() method, we call the built-in Persist() method to save the message and call a handling method after the save is successful:

        public void PersistAndHandle(MoveMessage message)
        {
            Persist(message, persistedMessage => Handle(persistedMessage));
        }

The Handle() method is the same one we’ve seen before that handles the MoveMessage.

We could have done all this in one step within the Command<T>() call, as you can see in Petabridge’s Akka.Persistence blog article. However, I’m not a big fan of doing a lot of logic in nested lambdas, as they can quickly get out of hand for non-trivial scenarios.

Now we just need a little configuration to tell Akka.Persistence where it should store the messages whenever we call Persist():

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <configSections>
    <section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka" />
  </configSections>
  
  <startup> 
      <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6.1" />
  </startup>
    
  <akka>
    <hocon>
      <![CDATA[
      akka.persistence
      {
        journal
        {
          plugin = "akka.persistence.journal.sql-server"
          sql-server
          {
              class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"
              schema-name = dbo
              auto-initialize = on
              connection-string = "Data Source=.\\SQLEXPRESS;Database=AkkaPersistenceChess;Integrated Security=True"
          }
        }
      }
      ]]>
    </hocon>
  </akka>
</configuration>

Let’s run the program and enter a few moves that we can later replay. These are the moves for fool’s mate:

move f2 to f3
move e7 to e5
move g2 to g4
move d8 to h4

Here is what it looks like, just before the last move:

akka-persistence-chess-output

If you check your database now, you’ll see a couple of tables. The EventJournal one has an entry for each move you played (i.e. for each message that was handled):

akka-persitence-chess-journal-table

The values in the PersistenceId column match the game ID, which is what we provided in the ChessGameActor’s PersistenceId property. If we wanted to save the progress of a different game, we would pass in a different game ID (and thus PersistenceId) to our ChessGameActor.

Each PersistenceId should be unique across the across the actor system, and there should be only one instance of a persistence actor with that PersistenceId running at any given time. Doing otherwise would compromise the state saved in the database.

Recovering State

In our ChessGameActor’s constructor, we can use the Recover<T>() method to replay messages from the persistence store and recover our state (i.e. do event sourcing), before we begin receiving new messages.

        public ChessGameActor(Guid gameId, IActorRef rendererActor)
        {
            this.gameId = gameId;
            this.rendererActor = rendererActor;

            this.RedrawBoard();

            this.Recover<MoveMessage>(RecoverAndHandle, null);
            this.Command<MoveMessage>(PersistAndHandle, null);
        }

In this case, we’ll handle the recovered messages as normal, but we will also introduce an artificial delay so that the player can actually watch each move being replayed.

        public void RecoverAndHandle(MoveMessage message)
        {
            Handle(message);
            Thread.Sleep(2000);
        }

If we run the game again now, we can watch the moves being replayed, and then we can continue playing where we left off.

akka-persistence-chess-replay-output

Summary

In this article, we have learned how Akka.Persistence supports event sourcing. This is done as follows:

  1. Actors wanting to save messages should inherit from ReceivePersistentActor.
  2. They must supply a PersistenceId which is unique and which will be used to associate saved messages with this particular actor’s state (or that of any subsequent incarnations).
  3. Use Command<T>() instead of Receive<T>() for message handling.
  4. Use Persist() to save messages before handling them.
  5. Use Recover() to replay messages until the actor’s last state is restored.

The particular approach we have seen is called journaling, and it is only one feature of Akka.Persistence. This may be enough for chess games that typically last for not more than 30-40 moves. But in many other use cases with large data flows, the journal may grow a lot and it can take a while to restore state. Akka.Persistence supports a snapshot feature to help mitigate this problem.

Asynchronous and Concurrent Processing in Akka .NET Actors

Yesterday, I wrote a long article about asynchronous processing in Akka .NET actors, the dangers of async void methods, and PipeTo().

The article was written as a result of outdated Akka .NET documentation which claimed that async/await was evil in Akka .NET actors and recommended using PipeTo() instead (the documentation has since been updated). This was further exacerbated by the confusion between asynchronous and concurrent processing in the documentation.

I would like to thank Aaron Stannard and Roger Alsing from the Akka .NET team who, via separate channels, clarified a lot of the confusion. This new article covers asynchronous and concurrent processing in Akka .NET actors as a result.

Asynchronous vs Concurrent Processing

A lot of people confuse asynchronous and concurrent processing, more so given that you can do both with .NET Tasks.

This is asynchronous:

        static async Task RunAsync()
        {
            await Task.Delay(1000);
        }

RunAsync begins executing, but is suspended waiting for some external operation to occur (typically I/O such as reading from a database or a REST service). It can resume execution when the operation completes, but can’t continue doing other things in the meantime.

This, on the other hand, is concurrent:

        static async Task RunAsync()
        {
            Task.Delay(1000);
        }

Because the task is not awaited, the method proceeds with its execution while the task is running. In fact, we get a nice big warning when we do this in a method marked as async:

akkanet-async-not-awaited

If we extend our example just a little bit, we can understand the behaviour better:

        static Task RunAsync()
        {
            Task.Delay(1000)
                .ContinueWith(x => Console.WriteLine("After Delay"));
            Console.WriteLine("End of RunAsync()");
        }

The output is as follows:

End of RunAsync()
After Delay

Now, I could go on about definitions of synchrony and asynchrony, concurrency and parallelism, interleaving, and task-switching. But that would bore you to tears, and it’s really not the point here.

The important thing is to realise that despite using very similar C# syntax, we’re doing two very different things here. And I need to make this clear because PipeTo() is really targeted at concurrent processing, although it is described within the context of asynchrony in Akka .NET documentation.

async/await in actors

You can do async/await in an actor by using the ReceiveActor‘s ReceiveAsync() method:

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.ReceiveAsync<string>(HandleAsync);
        }

        public async Task HandleAsync(string str)
        {
            Console.WriteLine($"Begin {str}");

            await Task.Delay(2000);

            Console.WriteLine($"End {str}");
        }
    }

This is perfectly valid, and you will most likely resort to this when you need a guarantee on message order. In such situations, you don’t want to start processing the next message while waiting for an I/O operation to complete, as it is possible for the results of an older message to overwrite those of a newer message.

It is also very useful for sequential steps in I/O operations that depend directly on each other, such as when talking to a database:

        public async Task HandleAsync(string str)
        {
            const string connStr = @"Data Source=.\SQLEXPRESS;Database=test;Integrated Security=true";
            using (var conn = new SqlConnection(connStr))
            {
                await conn.OpenAsync();

                const string sql = "select * from person;";

                using (var command = new SqlCommand(sql, conn))
                using (var reader = await command.ExecuteReaderAsync())
                {
                    while (await reader.ReadAsync())
                    {
                        string id = reader["id"].ToString();
                        string name = reader["name"].ToString();

                        Console.WriteLine($"{id} {name}");
                    }
                }
            }
        }

However, this comes at a cost. First, there is a performance impact when compared to synchronous execution, because the Akka .NET pipeline needs to carry the current context across asynchronous steps.

Secondly, the actor will not be able to process the next message until the current one is finished. Sometimes, this is exactly what you want, such as when you need a guarantee on message order. In such situations, you don’t want to start processing the next message while waiting for an I/O operation to complete, as it is possible for the results of an older message to overwrite those of a newer message. But if your message processing does not depend on prior state, you will get much more throughput if you run tasks concurrently and use PipeTo() to collect the results (more on this later).

Using Ask()

Ask() lets you do request/response between actors:

    public class ServiceActor : ReceiveActor
    {
        public ServiceActor()
        {
            this.ReceiveAsync<string>(HandleAsync);
        }

        public async Task HandleAsync(string str)
        {
            await Task.Delay(2000);
            Sender.Tell(str + " done");
        }
    }

    public class MyActor : ReceiveActor
    {
        private IActorRef serviceActor;

        public MyActor(IActorRef serviceActor)
        {
            this.serviceActor = serviceActor;

            this.ReceiveAsync<string>(HandleAsync);
        }

        public async Task HandleAsync(string str)
        {
            Console.WriteLine($"Begin {str}");

            var result = await this.serviceActor.Ask(str);

            Console.WriteLine($"End {result}");
        }
    }

Here is the output for this:

akkanet-async-ask

The approach above is something you typically want to avoid, for the same reason outlined in the previous section. If your actor is waiting for a response, it can’t process other messages in the meantime. Most of the time you should spawn a concurrent task as shown in the documentation, unless you have a good reason for not wanting to process the next message before the current one has finished. Try to design your system in a push fashion, rather than request/response.

Concurrent Execution and PipeTo()

If you have no reason to process messages in a strictly sequential manner, then you can do long-running tasks and I/O operations in a spawned task.

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.Receive<string>(x => Handle(x));
        }

        public void Handle(string str)
        {
            Task.Run(async () =>
            {
                Console.WriteLine($"Begin {str}");

                await Task.Delay(2000);

                Console.WriteLine($"End {str}");
            });
        }
    }

Because of this, the actual processing you do within the task will be interleaved, as in yesterday’s article:

akkanet-async-concurrent

But this is okay, because we’re working on the assumption that the messages don’t need to be processed strictly in sequence.

Now if you want to send the result of your concurrent tasks somewhere, you can do that with PipeTo():

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.Receive<string>(x => Handle(x));
            this.Receive<int>(x => Handle(x));
        }

        public void Handle(string str)
        {
            Task.Run(async () =>
            {
                Console.WriteLine($"Begin {str}");

                await Task.Delay(2000);

                Console.WriteLine($"End {str}");

                return 42;
            }).PipeTo(Self);
        }

        public void Handle(int result)
        {
            Console.WriteLine($"Got result: {result}");
        }
    }

The result of the concurrent operation is sent to the actor you specify (in this case to itself) and processed as any other message in its mailbox. You can also do post-processing (e.g. check HTTP status code after an HTTP GET operation) by adding a ContinueWith(); see the PipeTo() article on the Petabridge blog for an example.

More Advanced Concurrent Operations

Given that you can use both tasks and async/await in your actors, you can use all of the typical patterns you would normally use with the Task Parallel Library (TPL).

Here’s an example representing when you’d aggregate data from multiple external resources:

    public class MyActor : ReceiveActor
    {
        public MyActor()
        {
            this.ReceiveAsync<string>(x => HandleAsync(x));
        }

        public async Task HandleAsync(string str)
        {
            var task1 = Task.Delay(1000).ContinueWith(x => { return 1; });
            var task2 = Task.Delay(2000).ContinueWith(x => { return 2; });
            var task3 = Task.Delay(3000).ContinueWith(x => { return 3; });

            var results = await Task.WhenAll<int>(task1, task2, task3);
            var sum = results.Sum();
            Console.WriteLine(sum);
        }
    }

WhenAll() will wait for all the tasks to complete before the method can proceed with its execution. Here’s the output:

akkanet-async-whenall

Here’s another example which takes the result of whichever task completes first:

        public async Task HandleAsync(string str)
        {
            var task1 = Task.Delay(1000).ContinueWith(x => { return 1; });
            var task2 = Task.Delay(2000).ContinueWith(x => { return 2; });
            var task3 = Task.Delay(3000).ContinueWith(x => { return 3; });

            var result = await await Task.WhenAny<int>(task1, task2, task3);
            Console.WriteLine(result);
        }

In this example, WhenAny() suspends execution of the method until any of the tasks completes. The result from the fastest task is taken.

akkanet-async-whenany

Note: if you’re looking to do this kind of concurrent fastest-query operation, you might want to look at Akka .NET Routers with Routing Strategies such as ScatterGatherFirstCompleted.

In-Depth Async in Akka .NET: Why We Need PipeTo()

Update 21st August 2016: I wrote this article based on outdated Akka .NET documentation that discouraged async/await within actors and suggested using PipeTo() instead. Akka .NET now does support async/await (thanks to the ReceiveAsync() method), and PipeTo() is not a replacement for it. Aaron Stannard (in a comment on this post) and Roger Alsing (on Reddit) from the Akka .NET team were very prompt in correcting various misconceptions, and Aaron Stannard has since updated the Petabridge blog post about PipeTo(). See my followup post for the latest best practices.

Tasks and the more recent async/await syntactic sugar have been a blessing for .NET developers aiming to keep their applications responsive despite increasing requirements for I/O and CPU-intensive requests.

Thus it was really odd for me to learn that Akka .NET, an emergent framework for distributed computing, not only does not support async/await directly within actors, but actually discourages its use (going as far as calling them “code smell”).

In fact, they implemented this PipeTo() workaround that you need to use, sending the result of a task to an actor for processing. You can’t use async/await; you have to resort to the old ContinueWith() way of chaining tasks if you want to do any post-execution logic. If you’ve worked with ContinueWith() in the past, you’ll know it can get ugly really fast.

Why is it such a problem to have elegant asynchrony in our actors, seeing how competitor Microsoft Orleans has no problem with it? As Natan Vivo said in the comments of The Top 7 Mistakes Newbies Make with Akka.NET:

“The fact I decided to use DbCommand.ExecuteNonQueryAsync() instead of DbCommand.ExecuteNonQuery() shouldn’t force me to break a single message into multiple messages with PipeTo.”

Update 20th August 2016: Thanks to the Reddit user who brought to my attention that there actually is proper async support (though apparently not yet documented anywhere). Use the ReceiveActor’s ReceiveAsync() method.

Why Akka .NET Discourages async/await

To learn why awaiting in an actor is bad, let’s break the rules and do it.

    public class BusyActor : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            Console.WriteLine($"Begin processing {message.ToString()}");

            Thread.Sleep(2000);

            Console.WriteLine($"End processing {message.ToString()}");
        }
    }

I have this example actor. For now it’s doing synchronous stuff, sleeping for a couple of seconds and writing something before and after so we can see the behaviour.

        static void Main(string[] args)
        {
            using (var actorSystem = ActorSystem.Create("MyActorSystem"))
            {
                var actor = actorSystem.ActorOf(Props.Create<BusyActor>(), "BusyActor");

                actor.Tell("Task 1");
                actor.Tell("Task 2");
                actor.Tell("Task 3");

                Console.ReadLine();
            }
        }

The main program simply creates the actor system and the actor, and then sends it three messages in succession.

akkanet-async-synchronous-output

As you can see, the messages are handled sequentially and there is no overlap.

Now let’s change the actor to work asynchronously instead:

    public class BusyActor : UntypedActor
    {
        protected override async void OnReceive(object message)
        {
            Console.WriteLine($"Begin processing {message.ToString()}");

            await Task.Delay(2000);

            Console.WriteLine($"End processing {message.ToString()}");
        }
    }

Run it again…

akkanet-async-async-output

What happened here? All three messages were processed in quick succession, and they have been interleaved. This is very bad, and in fact we were warned about it. Quoting the questions on the official PipeTo() sample:

“Await breaks the “actors process one message at a time” guarantee, and suddenly your actor’s context might be different. Variables such as the Sender of the previous message may be different, or the actor might even be shutting down when the await call returns to the previous context.”

Why Processing Messages Asynchronously Causes Interleaving

We can learn a lot about how actors process messages by investigating the Akka .NET source code. This method in Mailbox.cs seems to be more or less where actors begin to process their messages:

        private void ProcessMailbox(int left, long deadlineTicks)
        {
            while (ShouldProcessMessage())
            {
                Envelope next;
                if (!TryDequeue(out next)) return;

                DebugPrint("{0} processing message {1}", Actor.Self, next);

                // not going to bother catching ThreadAbortExceptions here, since they'll get rethrown anyway
                Actor.Invoke(next);
                ProcessAllSystemMessages();
                if (left > 1 && (Dispatcher.ThroughputDeadlineTime.HasValue == false || (MonotonicClock.GetTicks() - deadlineTicks) < 0))
                {
                    left = left - 1;
                    continue;
                }
                break;
            }
        }

From Actor.Invoke(), there is a succession of method calls that ends in a method called Receive() in UntypedActor.cs:

        protected sealed override bool Receive(object message)
        {
            OnReceive(message);
            return true;
        }

Our OnReceive() method, where implement our message-handling logic for our actors, is subsequently called.

Now, the code above may look confusing, but the point here is not to understand what it’s doing exactly. Take a closer look. The methods in the call stack are mostly void (or otherwise returning simple types). There are no Tasks to be seen anywhere.

What does this mean for us? It means that we’re doing something very bad when we declare our message handler as async void.

Understanding async void

In order to better understand why the approach we took earlier will never work, it’s best to look at a much simpler example:

    class Program
    {
        static void Main(string[] args)
        {
            RunAll();
            Console.ReadLine();
        }

        static void RunAll()
        {
            RunJob("Job 1");
            RunJob("Job 2");
            RunJob("Job 3");
        }

        static void RunJob(string str)
        {
            Console.WriteLine("Start " + str);

            Thread.Sleep(2000);

            Console.WriteLine("End " + str);
        }
    }

Here we’ve reproduced the earlier scenario, but with no Akka .NET. And with the synchronous implementation, it works just fine:

akkanet-async-taskasync-output

Let’s change RunJob() to run asynchronously:

        static async void RunJob(string str)
        {
            Console.WriteLine("Start " + str);

            await Task.Delay(2000);

            Console.WriteLine("End " + str);
        }

When we run it, the following happens:

akkanet-async-taskasync2-output

This is exactly the same interleaving problem we had with Akka .NET, except that this time we have no Akka .NET.

The real reason why we have this problem is due to an incorrect use of asynchrony. As you can read in Stephen Cleary’s MSDN Magazine article, “Async/Await – Best Practices in Asynchronous Programming” (March 2013), async void methods can be pretty dangerous to work with. When you call an async void method, you have two main problems: you have no way of awaiting completion of the method, and exceptions can bring the whole application down.

But here, we have also seen a third problem: that the method effectively exits when you await, returning execution control to the caller. In Akka .NET, this means that the next message will begin processing while the current one hasn’t finished yet.

async void methods should be restricted to methods at the beginning of the call chain (such as event handlers and WPF command handlers). You can’t sneak asynchrony into an otherwise synchronous call stack by introducing an async void. If you do async, it has to be all the way.

So it really seems that the problem with having asynchronous actor logic is simply that Akka .NET was never really designed to work with asynchronous methods.

Asynchrony in Akka .NET with PipeTo()

It should be clear by now that doing async/await in actors is not an option. So how do we go about doing our asynchronous work? We do that by using the PipeTo() pattern (because in Akka .NET, everything is called a pattern).

Let’s go back to our original example with the BusyActor. We left off with this code:

    public class BusyActor : UntypedActor
    {
        protected override async void OnReceive(object message)
        {
            Console.WriteLine($"Begin processing {message.ToString()}");

            await Task.Delay(2000);

            Console.WriteLine($"End processing {message.ToString()}");
        }
    }

Now, we need to refactor this to do the asynchronous operation (in this case Task.Delay()) in a fire-and-forget manner, and send the result as a separate message to an actor. We’re going to need separate messages for this:

    public class TaskMessage
    {
        public string Message { get; }

        public TaskMessage(string message)
        {
            this.Message = message;
        }

        public override string ToString()
        {
            return this.Message;
        }
    }

    public class ResultMessage
    {
        public string Message { get; }

        public ResultMessage(string message)
        {
            this.Message = message;
        }

        public override string ToString()
        {
            return this.Message;
        }
    }

Since our message handling is going to grow a little, UntypedActor is no longer suitable for what we need. Instead, we’ll refactor BusyActor as follows:

    public class BusyActor : ReceiveActor
    {
        public BusyActor()
        {
            this.Receive<TaskMessage>(m => Handle(m));
            this.Receive<ResultMessage>(m => Handle(m));
        }

        public void Handle(TaskMessage message)
        {
            Console.WriteLine($"Begin processing {message.ToString()}");

            Task.Delay(2000)
                .ContinueWith(x => new ResultMessage(message.Message),
                    TaskContinuationOptions.AttachedToParent
                    & TaskContinuationOptions.ExecuteSynchronously)
                .PipeTo(Self);
        }

        public void Handle(ResultMessage message)
        {
            Console.WriteLine($"End processing {message.ToString()}");
        }
    }

Similarly to the official example (which shows how to do an HTTP GET request within an actor), we are firing off an asynchronous request but not awaiting it. This happens in fire-and-forget manner as far as the actor is concerned. When the asynchronous operation is done, we create a new message and send it to the same actor so that he can log the end of the task.

Note that we have those two TaskContinuationOptions settings. You can read more about them in the official PipeTo() blog post, but the point I want to make here is that you need to remember to include them, and this makes this approach pretty error-prone.

Back in our main program, we need to send a TaskMessage instead of a simple string now:

        static void Main(string[] args)
        {
            using (var actorSystem = ActorSystem.Create("MyActorSystem"))
            {
                var actor = actorSystem.ActorOf(Props.Create<BusyActor>(), "BusyActor");

                actor.Tell(new TaskMessage("Task 1"));
                actor.Tell(new TaskMessage("Task 2"));
                actor.Tell(new TaskMessage("Task 3"));

                Console.ReadLine();
            }
        }

Let us now run this code:

akkanet-async-pipeto-interleaving

This is bad. Even with PipeTo(), we still have the same interleaving problem as before. If you think about it, it makes sense.

What we are doing is firing off a fire-and-forget task, and the method can return immediately, thus allowing the next message to be processed before the asynchronous operation has completed. This is exactly the same problem we had when using async void.

If you’re firing off an asynchronous operation that doesn’t touch anything else and you just want to take its result, then the suggested PipeTo() approach will work. But if you need a guarantee on message order because your message processing is touching some state (thus an older message might overwrite the results of a newer message), then this is going to be a problem.

Coupling and Cohesion

Another problem with using PipeTo() is that it… complicates things. You can already see how our original example has been bloated into something a lot less easy to work with, just for the sake of not using async/await. But there’s more.

One common pitfall I see when developers begin to understand the importance of decoupled software is that they go to the other extreme: they split up components into extremely granular classes. In doing so, they breaking the companion principle of coupling: cohesion. While coupling dictates that software components should have minimal dependencies between themselves, cohesion suggests that components with strong direct interrelations should work closely together. Making classes too granular, for instance, is another way to end up with messy software.

At the beginning of this article, I quoted Natan Vivo’s comment about having to break a database operation into multiple operations. Typically, in ADO .NET, a database operation would look something like this:

  1. Open a connection to the database.
  2. Execute a command (query, nonquery, etc) against the database.
  3. In case of a query, iterate over the rows and do something with them.

Each of the three operations above can be done asynchronously in sequence. They are meant to be together because they are part of the same cohesive operation. But if you break each of these operations into different messages and different message handlers, you’re going to scatter this otherwise contiguous operation all over the place. And that makes software a lot harder to maintain.

So when I see something like (to again quote the questions from the official Akka .NET PipeTo() sample) this:

“So just don’t do it. Await is evil inside an actor. Await is just syntactic sugar anyway. Use ContinueWith and PipeTo instead.”

…I feel the need to remind people that syntactic sugar is really important to make our software easier to write, but more importantly, easier to maintain.

For the reasons outlined above, I believe that the PipeTo() ‘pattern’ is really an anti-pattern, and I appeal for native asynchronous support in Akka .NET rather than quirky workarounds.

Multiplayer Game with Akka .NET and Publish/Subscribe

This article shows how to develop the basis of a client/server multiplayer game. The clients move around the playing field and their movements are broadcasted to all other clients via the server. This is done using the Publish/Subscribe (or Observer) pattern. The software is developed using Akka .NET, and the network communications implementation is based on Akka.Remote.

The source code for this article is available at the Gigi Labs BitBucket Repository.

Prerequisites

In this article, we’re going to develop a client/server application. In our solution, we’ll have two different console applications for client and server respectively. We’ll also have a third project: a class library containing message classes used by both. The client and server projects will reference this class library.

We’re going to use Akka .NET and its related Akka.Remote component in both client and server. These can be installed via NuGet.

Install-Package Akka
Install-Package Akka.Remote

Server

The server is actually the simplest part of this application. We have one actor which is a direct implementation of the Publish/Subscribe pattern. This ServerActor keeps track of subscribers (which are remote client actors in this case) using their respective subscriber GUIDs.

    public class ServerActor : TypedActor,
        IHandle<SubscribeMessage>,
        IHandle<UpdateLocationMessage>,
        IHandle<UnsubscribeMessage>
    {
        private Dictionary<Guid, IActorRef> subscribers;

        public ServerActor()
        {
            this.subscribers = new Dictionary<Guid, IActorRef>();
        }

        public void Handle(SubscribeMessage message)
        {
            this.subscribers[message.SubscriberGuid] = Sender;
        }

        public void Handle(UpdateLocationMessage message)
        {
            this.Broadcast(message);
        }

        public void Handle(UnsubscribeMessage message)
        {
            // remove subscription

            if (this.subscribers.ContainsKey(message.SubscriberGuid))
                this.subscribers.Remove(message.SubscriberGuid);

            // broadcast removal to everyone else

            this.Broadcast(message);
        }

        private void Broadcast<T>(T message) where T : class
        {
            foreach (var subscriber in this.subscribers.Values)
                subscriber.Tell(message);
        }
    }

Subscription and unsubscription are simple add/remove operations on the subscriber dictionary. When a client moves in the playing area, he sends an update to the server in the form of an UpdateLocationMessage. This is then broadcasted to all subscribers (in this case also to the sender) so that they can update the position of the client that moved.

The messages will be covered in the next section.

The server program does nothing more than create the ActorSystem, and an instance of the ServerActor within it:

        static void Main(string[] args)
        {
            var configManager = ConfigurationManager.AppSettings;
            string actorSystemName = configManager["actorSystemName"];

            Console.Title = $"{actorSystemName} - Server";

            try
            {
                using (var actorSystem = ActorSystem.Create(actorSystemName))
                {
                    var server = actorSystem.ActorOf(
                        Props.Create<ServerActor>(), "ServerActor");

                    string serverActorAddress = configManager["serverActorAddress"];
                    var remoteServerActor = actorSystem.ActorSelection(serverActorAddress);

                    Console.ReadLine();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
        }

In the server’s App.config, we need to use HOCON to set up Akka.Remote, mainly defining the port at which the server will listen:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <configSections>
    <section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka" />
  </configSections>

  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6.1" />
  </startup>

  <appSettings>
    <add key="actorSystemName" value="PubSubGame" />
  </appSettings>

  <akka>
    <hocon>
      <![CDATA[
        
        akka
        {
            actor
            {
                provider = "Akka.Remote.RemoteActorRefProvider, Akka.Remote"
            }

            remote 
            {
                helios.tcp
                {
                    port = 7482
                    hostname = localhost
                }
            }
        }
      
      ]]>
    </hocon>
  </akka>
</configuration>

Common Messages

The message classes in the class library project, “GameMessages”, are required by both client and server because they are the means by which basic publish/subscribe interactions occur.

    public class SubscribeMessage
    {
        public Guid SubscriberGuid { get; }

        public SubscribeMessage(Guid subscriberGuid)
        {
            this.SubscriberGuid = subscriberGuid;
        }
    }

The SubscribeMessage is sent by the client when it starts. Through it, the server sets up an association between SubscriberGuid and the client’s IActorRef (with a dictionary entry). Although this association is not needed for message broadcast, it is necessary to remove the subscriber with that GUID when he unsubscribes.

    public class UnsubscribeMessage
    {
        public Guid SubscriberGuid { get; }
        public short LastX { get; }
        public short LastY { get; }

        public UnsubscribeMessage(Guid subscriberGuid,
            short lastX, short lastY)
        {
            this.SubscriberGuid = subscriberGuid;
            this.LastX = lastX;
            this.LastY = lastY;
        }
    }

The UnsubscribeMessage could in most cases be just a subscriber GUID. However, for our game, we are also including the last position of the client in the game area when he left the game. In this way, the other clients can update their view of the playing area by removing the client that left.

    public class UpdateLocationMessage
    {
        public char Avatar { get; }
        public short OldX { get; }
        public short OldY { get; }
        public short NewX { get; }
        public short NewY { get; }

        public UpdateLocationMessage(Guid subscriberGuid,
            char avatar, short oldX, short oldY, short newX, short newY)
        {
            this.SubscriberGuid = subscriberGuid;
            this.Avatar = avatar;
            this.OldX = oldX;
            this.OldY = oldY;
            this.NewX = newX;
            this.NewY = newY;
        }
    }

On startup and whenever a client moves, he sends an UpdateLocationMessage. In our case, this includes the new location of the client, his previous location, and an arbitrary character representing that client (avatar). The clients receiving this update will erase the client from the previous location, and draw him in the new location using the provided avatar.

Client Overview

akka-multiplayer-pubsub2

The client application is split up into three components.

  1. The main program logic sets up the ActorSystem and handles input from the console.
  2. The GameClientActor is responsible for communicating with the server. It sends messages to the server as a result of input coming from the main program logic, but also receives updates from the server. In both cases, updates are forwarded to the GameRenderingActor so that they can be drawn on the playing area in the console.
  3. The GameRenderingActor draws the playing area in the console. It keeps this view up to date as a result of the messages it receives from the GameClientActor.

Client – Main Program Logic

The main program for the client first takes care of setting up the ActorSystem, together with a GameClientActor and a GameRendererActor:

        static void Main(string[] args)
        {
            var configManager = ConfigurationManager.AppSettings;
            string actorSystemName = configManager["actorSystemName"];
            char avatar = configManager["avatar"][0];

            Console.Title = $"{actorSystemName} - Client";
            Console.OutputEncoding = Encoding.UTF8;
            Console.CursorVisible = false;

            try
            {
                using (var actorSystem = ActorSystem.Create(actorSystemName))
                {
                    short currentX = 40;
                    short currentY = 12;
                    Guid subscriberGuid = Guid.NewGuid();

                    var gameRendererActor = actorSystem.ActorOf(
                        Props.Create<GameRenderingActor>(), "GameRenderingActor");

                    var gameClientActor = actorSystem.ActorOf(
                        Props.Create<GameClientActor>(gameRendererActor,
                            currentX, currentY, subscriberGuid, avatar),
                        "GameClientActor");

                    HandleInput(gameClientActor);
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
        }

Once the ActorSystem is set up, the client enters a game loop, which runs indefinitely until the ESC key is pressed or the client application is killed.

        static void HandleInput(IActorRef gameClientActor)
        {
            while (true)
            {
                var key = Console.ReadKey(intercept: true);

                switch (key.Key)
                {
                    case ConsoleKey.LeftArrow:
                        gameClientActor.Tell(new MoveLeftMessage());
                        break;
                    case ConsoleKey.RightArrow:
                        gameClientActor.Tell(new MoveRightMessage());
                        break;
                    case ConsoleKey.UpArrow:
                        gameClientActor.Tell(new MoveUpMessage());
                        break;
                    case ConsoleKey.DownArrow:
                        gameClientActor.Tell(new MoveDownMessage());
                        break;
                    case ConsoleKey.Escape:
                        gameClientActor.Tell(new QuitGameMessage());
                        return;
                }
            }
        }

Since what we have here is an IActorRef of the GameClientActor rather than a direct instance, we need to communicate with it by sending messages (as opposed to calling methods on it directly).

Each of the messages used by HandleInput() is an empty class, so they are not shown here for brevity.

Although these messages could go into the “GameMessages” shared class library, I opted to put them directly in the client project because the server does not need to know about them. I consider it good practice to keep classes only within the context in which they are needed, as I believe it minimises dependencies (and therefore coupling), though your mileage may vary.

Client – GameRenderingActor

The GameRenderingActor is capable of drawing the state of clients in the playing area by doing only one thing: drawing a single character at a given location in the console window.

    public class GameRenderingActor : TypedActor,
        IHandle<DrawMessage>,
        IHandle<EraseMessage>
    {
        public void Handle(DrawMessage message)
        {
            Console.SetCursorPosition(message.X, message.Y);
            Console.Write(message.Char);
        }

        public void Handle(EraseMessage eraseMessage)
        {
            var x = eraseMessage.X;
            var y = eraseMessage.Y;
            char @char = ' '; // overwrite with space to erase whatever is drawn

            var drawMessage = new DrawMessage(x, y, @char);
            Self.Tell(drawMessage);
        }
    }

As a matter of fact, it knows how to process an EraseMessage (to clear a client’s former position) and a DrawMessage (to draw a client’s current position). These messages include a position (X and Y), and DrawMessage also includes the character to draw (typically the client’s avatar).

The EraseMessage is translated into a DrawMessage (via a message to self) where the character to be drawn is a space. This effectively erases whatever was previously drawn.

In case you’re wondering, the ‘@’ symbol is used because char is a C# reserved word, and you can’t name a variable using a reserved word. The ‘@’ symbol allows you to work around this restriction.

Client – GameClientActor – Setup

The GameClientActor is relatively large because it maintains client state and at the same time coordinates between all the other components in the system (client main program, GameRenderingActor, and the server).

    public class GameClientActor : TypedActor,
        IHandle<UpdateLocationMessage>,
        IHandle<UnsubscribeMessage>,
        IHandle<MoveLeftMessage>,
        IHandle<MoveRightMessage>,
        IHandle<MoveUpMessage>,
        IHandle<MoveDownMessage>,
        IHandle<QuitGameMessage>
    {
        private IActorRef gameRenderingActor;
        private ActorSelection remoteServerActor;

        // client actor state
        private short currentX;
        private short currentY;
        private Guid subscriberGuid;
        private char avatar;

        public GameClientActor(IActorRef gameRenderingActor, short initialX,
            short initialY, Guid subscriberGuid, char avatar)
        {
            this.gameRenderingActor = gameRenderingActor;

            string serverActorAddress = ConfigurationManager
                .AppSettings["serverActorAddress"];
            this.remoteServerActor = Context.ActorSelection(serverActorAddress);

            this.currentX = initialX;
            this.currentY = initialY;
            this.subscriberGuid = subscriberGuid;
            this.avatar = avatar;

            this.Subscribe();
        }

//...

    }

The top of the class declaration makes it immediately evident what messages the GameClientActor is capable of processing (which is one of the reasons why I prefer the IHandle<> approach over ReceiveActors). These include movement/quit messages coming from user input from the main program logic, but also UpdateLocationMessages and UnsubscribeMessages coming from the server.

The constructor takes care of initialising client game state, based on what is passed in from the main program logic. However, the GameClientActor must also keep references to the other actors it talks to, i.e. the GameRenderingActor and the ServerActor running remotely on the server.

When all this state is set up, the client subscribes with the server and sends it an initial location update so that other clients may know it exists. These operations are done thanks to the following two helper methods:

        private void Subscribe()
        {
            // send subscribe message

            var subscribeMessage = new SubscribeMessage(subscriberGuid);
            remoteServerActor.Tell(subscribeMessage, Self);

            // send initial location

            SendLocationUpdate(0, 0);
        }

        private void SendLocationUpdate(short oldX, short oldY)
        {
            var updateLocationMessage = new UpdateLocationMessage(subscriberGuid,
                avatar, oldX, oldY, currentX, currentY);
            remoteServerActor.Tell(updateLocationMessage);
        }

Something very important to note here is that we’re including Self as a second parameter when sending the SubscribeMessage. That’s because of this code in the ServerActor:

        public void Handle(SubscribeMessage message)
        {
            this.subscribers[message.SubscriberGuid] = Sender;
        }

I’ve found that if you want to use a reference to Sender, you need to pass the sender as a second parameter when sending the original message. If you don’t, Sender defaults to some weird dead letter thingy. I don’t know if this is by design or some bug, but keep it in mind because it can bite you.

Client – Configuration

The client configuration is a little bit different from that of the server.

  <appSettings>
    <add key="avatar" value="X" />
    <add key="actorSystemName" value="PubSubGame" />
    <add key="serverActorAddress" value="akka.tcp://PubSubGame@localhost:7482/user/ServerActor" />
  </appSettings>

We can specify an avatar character that will be drawn to represent this client at its position. We will change this for each instance of the client that we run, so that we can distinguish between them.

We also need to set up the endpoint of the remote ServerActor in order to be able to communicate with it.

  <akka>
    <hocon>
      <![CDATA[
        
        akka
        {
            loglevel = ERROR
        
            actor
            {
                provider = "Akka.Remote.RemoteActorRefProvider, Akka.Remote"
            }

            remote 
            {
                helios.tcp
                {
                    port = 0
                    hostname = localhost
                }
            }
        }
      
      ]]>
    </hocon>
  </akka>

Client connections normally don’t need to be bound to a specific port. Instead, by setting the port to 0, we let the operating system give us an available port of its own choosing for the client socket. This is perfectly fine because client connections are outgoing; so unlike server sockets, they don’t need to listen for connections on a particular port, and are not subject to firewall restrictions.

Note that we’re setting loglevel pretty high here. That’s so the typical Akka .NET output doesn’t mess with our drawing of the playing area.

Client – GameClientActor – Movement

A lot of the movement code in GameClientActor is the same: keep a copy of the old position, update the state with the new position, and then send a location update to the server with both the old and the new position. Thus a common helper method is used:

        private void MoveAndSendLocationUpdate(Action move)
        {
            short oldX = currentX;
            short oldY = currentY;

            move();

            this.SendLocationUpdate(oldX, oldY);
        }

The move action is simply a tweak in the location state. It is different for each movement message, and thus passed in to the helper method:

        public void Handle(MoveLeftMessage message)
        {
            this.MoveAndSendLocationUpdate(() => currentX--);
        }

        public void Handle(MoveRightMessage message)
        {
            this.MoveAndSendLocationUpdate(() => currentX++);
        }

        public void Handle(MoveUpMessage message)
        {
            this.MoveAndSendLocationUpdate(() => currentY--);
        }

        public void Handle(MoveDownMessage message)
        {
            this.MoveAndSendLocationUpdate(() => currentY++);
        }

The QuitMessage is a special case, as we need to send an UnsubscribeMessage to the server instead:

        public void Handle(QuitGameMessage message)
        {
            short oldX = currentX;
            short oldY = currentY;

            var unsubscribeMessage = new UnsubscribeMessage(subscriberGuid,
                oldX, oldY);
            remoteServerActor.Tell(unsubscribeMessage);
        }

Client – GameClientActor – Messages from Server

When a location update is received from the server, the GameClientActor passes instructions to the GameRenderingActor to erase the old position of the client that sent the update, and redraw it at its new position:

        public void Handle(UpdateLocationMessage message)
        {
            var eraseMessage = new EraseMessage(message.OldX, message.OldY);
            var drawMessage = new DrawMessage(message.NewX, message.NewY,
                message.Avatar);

            this.gameRenderingActor.Tell(eraseMessage);
            this.gameRenderingActor.Tell(drawMessage);
        }

The UnsubscribeMessage is handled similarly, but erases the old position only:

        public void Handle(UnsubscribeMessage message)
        {
            var eraseMessage = new EraseMessage(message.LastX, message.LastY);

            this.gameRenderingActor.Tell(eraseMessage);
        }

Let’s Run It!

Run one instance of the server. Then, run as many client instances as you like, each with a different avatar configuration. Press the arrow keys in each client console window to move your clients a little. When you move an avatar, you will see it move in all the other windows as well.

pubsubgame-output

Possible Improvement

One thing you’ll notice is that when a new client joins, he won’t see the other clients until they have moved and broadcasted an update. This is a limitation of not keeping client state on the server, and there are many ways to fix this. It is left as an exercise.

Consistent Hashing in Akka .NET

The source code for this article is available at the Gigi Labs BitBucket repository.

Routers

In Akka .NET, a router is like a load balancer. It is a special actor that does not handle messages itself, but passes them on to other actors who can handle them. For this reason routers are the only kind of actor that can deal with several messages concurrently (whereas normal actors process messages sequentially, one by one).

The way routers forward messages to handling actors depends on the type of router you use. Some common routing strategies include broadcast, round robin, and random. In this article, we will deal with the ConsistentHashing router. Consistent hashing means that messages with the same (arbitrarily defined) key are always handled by the same actor.

Another important distinction between routers is that they fall under two categories: Group routers and Pool routers. “Pool” means the same as in the context of “Thread Pool” or “Connection Pool”: it is a dynamic set of resources that can adaptively grow and shrink as needed. A Pool router creates the actors that it will forward messages to for handling, and as such, it also supervises them. Group routers, on the other hand, passed a set of actors that are created beforehand. As such the handling actors are fixed in number and detached from the router; the Group router does not supervise them and often does not know when they die. For this reason Pool routers are preferred for most use cases.

There is a lot more to be said about routers. However, this section is intended only as a brief background. For more comprehensive references, check the links in the Further Reading section.

Consistent Hashing Example with Currency Pairs

In the financial industry, currency exchange is defined in terms of a currency pair, such as EUR/USD. This currency pair has a price, such as 1.1087. This means that 1 Euro is worth 1.1087 US Dollars. The currency exchange market is very volatile, and these prices can change several times per second.

In our example, we will be generating fictitious currency prices. We would like to have a pool of actors to handle these price updates. We would also like each currency pair to be always be handled by the same actor.

As always, we first need to install the Akka NuGet package:

Install-Package Akka

Then, in our Main() method, we will first add some trivial setup code:

            Console.Title = "Akka .NET Consistent Hashing";

            var random = new Random();
            var currencyPairs = new string[]
            {
                "EUR/GBP",
                "USD/CAD",
                "NZD/JPY",
                "EUR/USD",
                "USD/JPY",
                "NZD/EUR"
            };

Our program logic goes like this:

            using (var actorSystem = ActorSystem.Create("MyActorSystem"))
            {
                var pool = new ConsistentHashingPool(3);
                var props = Props.Create<CurrencyPriceChangeHandlerActor>()
                    .WithRouter(pool);
                var router = actorSystem.ActorOf(props, "MyPool");

                for (int i = 0; i < 20; i++)
                    SendRandomMessage(router, random, currencyPairs);

                Console.ReadLine();
            }

Here we’re setting up a Pool router using the Consistent Hashing strategy. A pool of 3 actors will be created, supervised by the router. We can send a message to the router as we would with any other actor, but it will actually be handled by one of its child actors.

The child actors are of type CurrencyPriceChangeHandlerActor. This type of actor simply writes the received message to the console, along with its own path so that we can distinguish between the child actors. The path is dynamically generated by the Pool router and we have no control over it.

    public class CurrencyPriceChangeHandlerActor
        : TypedActor, IHandle<CurrencyPriceChangeMessage>
    {
        public CurrencyPriceChangeHandlerActor()
        {
            
        }

        public void Handle(CurrencyPriceChangeMessage message)
        {
            Console.WriteLine($"{Context.Self.Path} received: {message}");
        }
    }

The message handled by this type of actor is a simple combination of currency pair and price. In line with best practices, the message is immutable by design. More importantly, it implements IConsistentHashable. This allows us to provide a key that will be used for the consistent hashing algorithm. In our case, the key is the currency pair.

    public class CurrencyPriceChangeMessage : IConsistentHashable
    {
        public string CurrencyPair { get; }
        public decimal Price { get; }

        public object ConsistentHashKey
        {
            get
            {
                return this.CurrencyPair;
            }
        }

        public CurrencyPriceChangeMessage(string currencyPair, decimal price)
        {
            this.CurrencyPair = currencyPair;
            this.Price = price;
        }

        public override string ToString()
        {
            return $"{this.CurrencyPair}: {this.Price}";
        }
    }

Note: this is just one of three ways how we can specify the key to use with consistent hashing. Refer to the documentation for more information.

All we have left is the implementation of SendRandomMessage(). It picks a random currency pair and a random price, and sends a message. It also introduces a random delay between each message. Without this delay, you’ll see a lot of the same currency pairs in sequence.

        private static void SendRandomMessage(IActorRef router, Random random,
            string[] currencyPairs)
        {
            var randomDelay = random.Next(100, 1500);
            var randomCurrencyId = random.Next(0, currencyPairs.Length);
            var randomPrice = Convert.ToDecimal(random.NextDouble());

            var currencyPair = currencyPairs[randomCurrencyId];

            var message = new CurrencyPriceChangeMessage(
                currencyPair, randomPrice);
            router.Tell(message);

            Thread.Sleep(TimeSpan.FromMilliseconds(randomDelay));
        }

Here’s what we get when we run the program:

akkanet-consistenthashing-output

You can see how although all three handling actors are in use, there is a direct correspondence between the currency pair and the actor that handles it. For example, USD/CAD is always handled by actor $c, whereas NZD/EUR is always handled by actor $b. This is what is implied by consistent hashing.

As far as I can tell, control of which actors handle which keys is up to the router. I would have liked to, for instance, create an actor to specifically handle each currency pair. But I don’t think that is possible, even with Group routers (correct me if I’m wrong). Just let the router worry about how to allocate the keys to the actors.

Further Reading