F#, Back-End, and Thread-Safe Data Collection using MailboxProcessor

(F# Advent Calendar 2020: https://sergeytihon.com/2020/10/22/f-advent-calendar-in-english-2020/). Thanks to Sergey Tihon for organizing it again this year and to all participants for their early X’mas gifts!

Objective

Web Development using F# is now pretty common, well documented and represents a fast way to develop both Front and Back end using a functional-first approach.
Using .NET 5 (the latest and now only .NET branch) and ASP.NET, supplemented by either Giraffe, Suave or Saturn is the approach used by the SAFE Stack (https://safe-stack.github.io/).

During my first experience with it, I had to think of a proper way to implement, at the back-end level, a thread safe collection to contain data, configuration etc…
It is clear that I also needed to implement additional functionalities such as retrieving data from a database, saving new data to a database, logging operations (and its potential errors due to the I/O functions…).

That is where I have decided to use Dependency Injection and so I had to build a way to achieve, in a thread-safe manner the goals fixed before.

Mailbox Processor, simplified message/actor framework

Mailbox Processor is a native feature of F# which consists of an agent, receiving strongly-typed messages, stores them into its own queue/inbox and processes them one at a time.
It can be seen as a simplified model from the Akka library (Scala/Java) and its equivalent Akka.NET for .NET. The communication can be bi-directional (equivalent to the Tell / Ask offered by the Akka framework).

Let’s implement a simple printfn example without keeping a state in between message processing.

As you can see on line #4, we enforce messages to be of the type string. We then create a recursive function returning an Async<unit>. The function is not able to directly return something else that unit. We will later see how we can reply to messages but first, let’s see how we can keep a thread-safe state.

This time, our recursive function takes a parameter (for the first call of the function, we are passing an empty string list) and after processing a message (still being a string), will add it up as head of our list.

Let’s now use a more complex message type which will allow us more advanced functionalities within our MailboxProcessor. F# offers us a perfect way to model this using Discriminated Union (DU) types (called as well Sum types). As well as making the model of our message type very easy to represent, they are perfect when we need to pattern-match (and any modern IDE would even let you know that not all cases are covered — or at worst at compile time with a warning, perfect for not forgetting any case)

We are now using a more complex message time that let us operate more complex actions on our MailboxProcessor.

AsyncReplyChannel<’T> is the way to do a PostAndReply (equivalent to a Ask in Akka). It also has its Async version, PostAndAsyncReply. We define here the type of the returned answer, int for the number of elements, and a list of string for all the elements. RemoveAllElements does not return anything as since the function is pure (no side effect), there is no need to return anything.

Let’s now look at the implementation of the MailBoxProcessor taking our new Message type.

Executing now a list of Post / PostAndReply shows you messages are processed one at a time and in the order of arrival.

Defining MailboxProcessor as a class

It is often useful to define a class containing our MailboxProcessor and methods to access it in a more transparent way (instead of having to Post / PostAndReply one of the case of our DU Message.

This time, we will use .NET generics and instead of using a list to contain our ‘T elements, we will use a Set. A Set is an immutable data collection from the F# core library which contains only unique elements. That is why, to use it, you need to make sure that your type ‘T implements IComparable. For “basic” types (string, int, float …) there is no need to implement it (and same for F# Record types using these basic types) but it may be the case for more complex data types.

Here, for demonstration, I have used the type string but we could consider something more complicated.

Using a MailboxProcessor within our Back-end

Now that we have seen how we could use a MailboxProcessor to keep a thread-safe, let’s adapt it to mimic the following process:

  1. A class containing all of our MailboxProcessor
  2. Possibility to load data into them at the start-up
  3. Possibility to know if the initial loading process has been executed

For #3, we will now use a new Discriminated Union to reflect the current state of our MailboxProcessor.

We will also now return a Result<’T, ‘TError> type to reflect the current state. Doing so will also us to manage in a functional way situations such as: Data were not loaded, Error during the loading of the data (as you may use non pure functions such as those reading databases. files etc…), data were unloaded etc…

Let’s look at our new type to define the state of our MailboxProcessor.

Instead of just being a Set<’T>, we have now a more expressive way to describe the current state of our MailboxProcessor.

We are also adding a DU for the ‘TError type returned in the Result<’T,’TError>.

we are now using a slightly more complex Message type.

Async<Result<Set<’T>, exn>> is here a way to represent an Async job to be run within our MailboxProcessor in order to fetch the initial Data. It can represent anything. We could have also decided to initialize our Class with an original data set (not not ideal as you may want to wait before initializing data into your MailboxProcessor). This Async job may result in a failure reflected by the use of Railway Oriented Programming and the Result<’T,’TError> type.

Let’s implement our revised MailboxProcessor.

We have now implemented, using pattern-matching on the tuple (Message, State) all possibilities.

We have also added methods to simplify the usage of our MailboxProcessor.

Let’s now check what we get. We will first define a simple record Name which will get structural equality by default and then will not require us to implement IComparable. We define as well a value name1 as well as an Async job simulating an operation to get initial data (be it a database query, API query etc… By default these operations are not pure then a Result type and are asynchronous).

Running a few tests in F# Interactive returns us the expected results:

I have decided to use PostAndAsyncResult to return an Async operation. This offers us more freedom on how we want to get our result.

Dependency Injection

Now that we have defined, using MailboxProcessor, a thread-safe way to share data within our back-end, we just have to “add” a singleton service into our IServiceCollection.

Conclusion

This is just one of the many way to deal with Data collection in a thread-safe way on your back-end. For example, .NET offers you most of the collections available in a thread-safe way using lock. You may want to check https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/

However, I do like the MailboxProcessor approach as it is by nature thread-safe (one message processed at a time) and very F# idiomatic.

This is also a way, for those not familiar with MailboxProcessor to see what it can offer.

This article lets a lot of things on the side and is just a very simplified example of Dependency Injection.

As always, comments, corrections and questions are welcome.

This article was published for the F# Advent Calendar 2020 (in English), more information here: https://sergeytihon.com/2020/10/22/f-advent-calendar-in-english-2020/

F#, Volatility trader, Crypto currencies