The critical section problem in Erlang

July 31, 2009 · Posted in Erlang · 2 Comments 

The programming language Erlang is based on micro-threads and asynchronous message passing. There is a (naive) belief that critical section problems cannot arise in languages based solely on message passing. The justification for this stand-point is the absence of mutex synchronization primitives, which is absolutely essential in shared-data based concurrent languages. As we will shortly see, it’s more about the design of the data operations than what means of thread synchronization is used. But first…

A Little Bit of Theory

Critical Section

A critical section referrers to the code section executed to operate on shared data. Shared data is some state that can be accesses by several threads concurrently. The conditions required for a critical section problem to occur are

  1. A non-atomic mutating operation. That means an operation consisting of the primitives READ/MODIFY/WRITE and they are not executed as one single indivisable operation.
  2. Concurrent threads performing the operation.
  3. Interruption of the threads somewhere within the operation. That means, one thread currently performing the operation is suspended and another starts and completes the operation, before the first thread are allowed to proceed.

Mutex Synchronization

In order to solve the problem (above) one has to violate at least of the conditions. The standard way of doing that is the realization of so called mutual exclusive access to the code section. This is normally solved using some form of lock. Java has a keyword synchronized, which behind the scenes uses POSIX mutex_lock()/unlock() on POSIX compliant systems such as Linux.

The Demo Problem

Consider a back account with multiple updaters performing depositions and withdrawals (yes, this is a classical ;-)

We start the system with 1 account of balance 0, U updaters which all performs 2N updates. First they deposit 100 (choose your currency here) N times and then withdraws 100 N times. The resulting balance should of-course be 0.

Erlang Implementation

Account

Let’s start with the Erlang implementation of the account. The account is represented as a thread (process in Erlang’s parlance).The create function starts a new thread which executes the account body function. The body waits for any of a set of messages. A message is sent using rendezvous (blocking/synchronous) realized using the functions send/2, recv/0 and reply/2.

account_create() ->
    {account, spawn(?MODULE, account_body, [0])}.

account_body(Balance) ->
    case recv() of
        {From, done} ->
            reply(Balance, From),
            done;
        {From, get} ->
            reply(Balance, From),
            account_body(Balance);
        {From, {set, Value}} ->
            BalanceNew = Value,
            reply(BalanceNew, From),
            account_body(BalanceNew);
        _ ->
            account_body(Balance)
    end.

account_get(Account)           -> send(get, Account).
account_set(Account, Value)    -> send({set,Value}, Account).
account_destroy(Account)       -> send(done, Account).

Updater

An updater performs all of its operations and then terminates, notifying the bank about its ‘departure’.

updater_create(Id, Account, NumUpdates, RunMode) ->
    {updater, Id, spawn(?MODULE, updater_body, [Id, Account, NumUpdates, RunMode])}.

updater_body(Id, Account, NumUpdates, RunMode) ->
    io:format("Updater-~w: started~n", [Id]),
    repeat(NumUpdates, fun() -> account_update(Account, +100, RunMode) end),
    repeat(NumUpdates, fun() -> account_update(Account, -100, RunMode) end),
    bank ! {updater, Id},
    io:format("Updater-~w: terminated~n", [Id]).

account_update(Account, Value, unsafe) ->
    Balance    = account_get(Account),  %READ
    BalanceNew = Balance + Value,       %MODIFY
    account_set(Account, BalanceNew);   %WRITE

The updater body uses a helper function repeat/2 that takes a positive number and a closure as arguments. The closure performs one function call account_update/3, which is the key to this blog post. The  last parameter is the mode of operation. As you can see, currently I only show you the unsafe mode. First it calls get/1 to READ the balance value, then it MODIFY the value and finally it WRITE the value back using set/2, which sends the new balance value to the account thread. Both get/1 and set/2 are rendezvous functions. One might say that a rendezvous creates a mutex operation.

Helpers

Let’s quickly show the helper functions, to get them out of the picture.

send(Msg, {_,To}) ->
    To ! {msg, self(), Msg},
    receive
        {reply, To, Result} -> Result
    end.

recv() ->
    receive
        {msg, From, Msg} -> {From, Msg}
    end.

reply(Msg, To) ->
    To ! {reply, self(), Msg}.

repeat(0, _Task) ->
    ok;
repeat(N, Task) when N > 0 ->
    Task(),
    repeat(N-1, Task).

Bank

Finally, here is the main function of the bank, that setups the system, runs it and tear it down again.

bank(NumThreads, NumUpdates, RunMode) ->
    io:format("Running the bank in ~w mode, with ~w updaters, performing 2*~w updates each~n",
              [RunMode, NumThreads, NumUpdates]),
    register(bank, self()),
    Account  = account_create(),
    Updaters = updater_create_all(NumThreads, Account, NumUpdates, RunMode),
    updater_wait_all(Updaters),
    Balance  = account_destroy(Account),
    unregister(bank),
    io:format("Final balance = ~w~n", [Balance]).

updater_create_all(NumThreads, Account, NumUpdates, RunMode) ->
    [updater_create(Id, Account, NumUpdates, RunMode) || Id <- lists:seq(1, NumThreads)].

updater_wait_all([]) ->
    ok;
updater_wait_all(Updaters) ->
    receive
        {updater, Id} ->
            updater_wait_all( lists:keydelete(Id, 2, Updaters) )
    end.

As you can see, the bank first creates the (single) account (object) and then all the updater (threads). The updaters are created using list comprehension, which is sort of a for-loop for functional languages. To the right, it first generates a list of integers (1..N) which is then used as a loop variable for the expression to the left which creates a new updater for each value of Id. After that, the bank waits for all updaters to finish. This is performed using the Erlang equivalent of a join-loop. It waits for a {updater,Id} message, removes it from the list of updaters and breaks the loop when the list is empty. The last task the bank performs before it closes, is destroying the account and reading its final balance value. This value is then printed to the console.

The bank module exports two public functions, run/3 and exe/1. The former performs some type checking and than invokes bank/3. The latter is used as the entry point when invoking the program from the console. I do not show them here, but you can find the complete code last in this post.

Execution

Time for the fun part; running the bank. Below you can find a couple of runs with the same parameter settings.

Unsafe Mode

$ erl -noshell -pa target/beams -run bank exe 5 10000 unsafe
Running the bank in unsafe mode, with 5 updaters, performing 2*10000 updates each
Updater-1: started
Updater-2: started
Updater-3: started
Updater-4: started
Updater-5: started
Updater-1: terminated
Updater-2: terminated
Updater-4: terminated
Updater-5: terminated
Updater-3: terminated
Final balance = -907300
$ erl -noshell -pa target/beams -run bank exe 5 10000 unsafe
Running the bank in unsafe mode, with 5 updaters, performing 2*10000 updates each
Updater-1: started
Updater-2: started
Updater-3: started
Updater-4: started
Updater-5: started
Updater-4: terminated
Updater-3: terminated
Updater-2: terminated
Updater-5: terminated
Updater-1: terminated
Final balance = 343800
$ erl -noshell -pa target/beams -run bank exe 5 10000 unsafe
Running the bank in unsafe mode, with 5 updaters, performing 2*10000 updates each
Updater-1: started
Updater-2: started
Updater-3: started
Updater-4: started
Updater-5: started
Updater-5: terminated
Updater-4: terminated
Updater-3: terminated
Updater-1: terminated
Updater-2: terminated
Final balance = 436800
$

Pretty interesting, hu? Why is this happening? To understand this fully, we must return back to the theory at the top of this blog post. We can see that condition 1 is satisfied, because we perform the READ/MODIFY/WRITE operation in separate steps. Conditions 2 is trivially satisfied as well. How about number 3?

A few words about scheduling

Erlang threads are scheduled using what is referred to as runtime driven cooperative scheduling. Cooperative scheduling means that some party (the user or the  runtime system) switch threads at (hopefully) well-chosen points in the execution. The opposite is interrupt-driven preemptive scheduling, where a thread switch literally happens as a “bolt from a clear sky”. Very close to this form of scheduling is multi-core/cpu execution, which shows the same characteristics.This is main reason for introducing atomic locks in computer systems. A language based on cooperative scheduling, such as Erlang, can choose wisely where to switch threads.

In a language based on interrupt-driven scheduling, a thread is allowed to run for at most a fixed time period. There is a clock interrupt that fires at equiv-distant time steps. There is no such thing in Erlang. Instead a thread is allowed to invoke functions a certain amount of times. (To be more precise, N number of reductions, where N typically is 1000-2000).

Because the Erlang system has no understanding about the meaning of our account_update/3 function, it can (and does) switch the updater thread anywhere between each function/reduction within account_update/3, as soon as the thread as used up its current share of reductions.

Safe Mode

Now, when we understand why the problem arises we can move ahead and fix it. There is no such thing as a mutex lock in Erlang (although you can implement a thread type that acts as one), so we have to stick to the (obvious) solution of moving the READ/MODIFY/WRITE logic into the target thread. The account thread receives a message and performs one operation fully, before it moves on and serves the next message. This is our mutex. Below I show the missing code snippets.

account_update(Account, Value, safe) ->
    account_update(Account, Value).

account_update(Account, Value) -> send({update,Value}, Account).

account_body(Balance) ->
    case recv() of
        %. . .
        {From, {update, Value}} ->
            BalanceNew = Balance + Value,
            reply(BalanceNew, From),
            account_body(BalanceNew);
        %. . .
    end.

The ’safe’ version of account_update/3 invokes a new method (account_update/2) which sends the update mesage along with the value to the account thread. Within the account object/thread it now performs the operation in an atomic way, because nothing can interrupt its sequence of operations. Even if the account thread is suspended because it runs out of reduction ticks, no new operations will be started until the account resumes and completes the begun update operation. Nuff talk, let’s see it in action.

$ erl -noshell -pa target/beams -run bank exe 5 10000 safe
Running the bank in safe mode, with 5 updaters, performing 2*10000 updates each
Updater-1: started
Updater-2: started
Updater-3: started
Updater-4: started
Updater-5: started
Updater-3: terminated
Updater-2: terminated
Updater-5: terminated
Updater-1: terminated
Updater-4: terminated
Final balance = 0
$ erl -noshell -pa target/beams -run bank exe 5 10000 safe
Running the bank in safe mode, with 5 updaters, performing 2*10000 updates each
Updater-1: started
Updater-2: started
Updater-3: started
Updater-4: started
Updater-5: started
Updater-1: terminated
Updater-2: terminated
Updater-5: terminated
Updater-4: terminated
Updater-3: terminated
Final balance = 0
$ erl -noshell -pa target/beams -run bank exe 5 10000 safe
Running the bank in safe mode, with 5 updaters, performing 2*10000 updates each
Updater-1: started
Updater-2: started
Updater-3: started
Updater-4: started
Updater-5: started
Updater-4: terminated
Updater-1: terminated
Updater-3: terminated
Updater-5: terminated
Updater-2: terminated
Final balance = 0
$

Well, this is not a proof. But even running with hundreds of updaters doing millions of updates we will end up with the final balance = 0.

$ erl -noshell -pa target/beams -run bank exe 100 1000000 safe
Running the bank in safe mode, with 100 updaters, performing 2*1000000 updates each
Updater-1: started
Updater-2: started
...
Updater-99: started
Updater-100: started
Updater-7: terminated
Updater-29: terminated
Updater-90: terminated
...
Updater-16: terminated
Final balance = 0
$

Source Code

You will find a link to the source code below.

Compile it at the command line with

$ erlc bank.erl

or start the Erlang shell and compile/run it there

$ erl
1> c(bank).

Run it from the command line with

erl -noshell -run bank exe 10 1000 unsafe

Where 10 updaters are started doing 2*1000 updates each all in ‘unsafe’ mode. Or, run it inside the Erlang shell with

2> bank:run(10, 1000, unsafe).

Final words

This Erlang demo program is part of the course ware for a course in Erlang Basics I have been developing during the last couple of weeks. My Erlang course is marketed by Informator in Sweden and for a start intended for Ericsson.

If you are interested in attending the course, please don’t hesitate to contact Informator.

Using TMI instead of Actors

April 3, 2009 · Posted in Java · 1 Comment 

Here in Sweden the spring has finally arrived and one of the duties I have to do is switching from winter to summer tires on my car. I did that this morning and while waiting in the garage I was reading the two articles about Actor concurrency in JavaWorld. The first article described the actor semantics and how it was realized in the Erlang programming language and the second article described various ways to realize actors on top of the JVM, such as Scala, Groovy and Java it self.

I wasn’t aware of the multitude of actors implementations in Java, as the second article describe. The author Alex Miller implemented the same problem (Rock-Paper-Scissors) for each actor realization and compared the code and effort. This was interesting reading indeed.

Thread Method Invocation (TMI)

Many years ago, back in 2002 to be exact, I implemented a library for thread method invocation (TMI) in Java. The idea was to provide various message passing techniques to Java, ranging from simple mailboxing to non-deterministic rendezvous. The library is essentially different implementation of message queues.

Together with the library I wrote a (long) tutorial that started from the very beginning with motivation of why synchronized and wait/notify/notifyAll are needed and then step-by-step moved up in the abstraction level via different forms of message passing, ending in non-determinsitic rendezvous, inspired by the Ada programming language. The article was submitted to JavaWorld, but was declined. After that I was busy with so many other things that the idea and the libray went into limbo. Perhaps it might resurrect nowadays.

After having read the second article with its various Rock-Paper-Scissors implementations, I couldn’t resist the temptation to do my own implementation of Rock-Paper-Scissors using TMI. The rest of this post describes my implementation.

Before I continue, let’s state first that TMI is not an actors model. It is close in spirit, becuase it’s a message passing model, but do not honor the value passing semantics. You can pass what ever object you want as a sender argument or as a response object (rendezvous). Still I consider TMI be a very useful concurrency model and simplifies designing thread based Java programs a lot.

Rock-Paper-Scissors using TMI

The main class is simple and straight forward, let’s show it first. It creates two players and a coordinator and kicks the latter to start the game. The program runs for a specified number of seconds.

public class RockPaperScissorsUsingTMI {
    public static final String ROCK     = "rock";
    public static final String PAPER    = "paper";
    public static final String SCISSORS = "scissors";

    public static void main(String[] args) throws InterruptedException {
        int     runIntervalInSeconds = 20;
        if (args.length > 0) runIntervalInSeconds = Integer.parseInt( args[0] );

        Player          p1 = new Player(1);
        Player          p2 = new Player(2);
        Coordinator     c  = new Coordinator(p1, p2);

        c.start(); p1.start(); p2.start();
        c.startPlay();

        Thread.sleep(runIntervalInSeconds * 1000);
    }
}

Here is a sample output of running the program.

>java -jar RockPaperScissorsUsingTMI-1.0.jar 15
   0) Player-1: scissors -> paper
   1) Player-1: scissors -> paper
   2) Player-1: scissors -> paper
   3) Player-1: rock -> scissors
   4) Player-2: scissors -> paper
   5) Player-2: rock -> scissors
   6)      TIE: scissors
   7)      TIE: scissors
   8 ) Player-2: rock -> scissors
   9) Player-2: scissors -> paper
  10) Player-2: rock -> scissors
  11) Player-1: rock -> scissors
  12) Player-2: rock -> scissors
  13) Player-1: scissors -> paper
  14) Player-2: rock -> scissors
  15)      TIE: paper
  16) Player-1: rock -> scissors
  17) Player-2: paper -> rock
  18)      TIE: rock
>

The Player

The player class below is simple, but illustrates the TMI realization. In this case I have chosen to be close to the actors model and are using an unbounded message queue. The interesting methods are play() and receive() which shows the ’sender’ side and the ‘receiver’ side of a message queue. The sender invokes play(), which inserts a message into the queue. Because the queue is unbouded, this is not a blocking call. It’s easy to change the queue to a rendezvous queue, which results in the sender are blocked until the receiver puts a reply. Inside the run() method, a player recieves the coordinator. The player is blocked as long as the input queue is empty. Then it waits for a random amout of time and performs a move. It responds back to the coordinator using itself as argument.

public class Player extends Thread {
    private static final List<String>   moves = Arrays.asList(ROCK, PAPER, SCISSORS);
    private static final Random         r = new Random();
    private MessageQueue                playMsgs = new UnboundedMessageQueue();
    private String                      move;

    public Player(int id) {
        super("Player-"+id);
        setDaemon(true);
    }

    public void run() {
        try {
            while (true) {
                Coordinator c = receive();
                delay( r.nextInt(1000) );
                move = makeMove();
                c.throwResult(this);
            }
        } catch (InterruptedException e) {System.out.println(getName()+" Interrupted");}
    }

    public void     play(Coordinator master) {
        try {
            playMsgs.put(master);
        } catch (InterruptedException e) { }
    }
    private Coordinator     receive() {
        try {
            return (Coordinator) playMsgs.get();
        } catch (InterruptedException e) {return null;}
    }

    private String    makeMove() {
        return moves.get( r.nextInt(moves.size()) );
    }
    public String getMove() {
        return move;
    }
    protected Player setMove(String move) {
        this.move = move;
        return this;
    }

    private void delay(int numMilliSecs) {
        try {
            sleep(10 + numMilliSecs);
        } catch (InterruptedException e) { }
    }
}

The Coordinator

The coordinator is slightly more complex, due to its responsibilities. It waits for a start message, then ask each player to play and receives the throw messages in either order. It then evalutes who won this round. The concurrency part is not very complicated. It has two unbouded queues, one for each message type. And for each message type it has a public sender method (startPlay/throwResult) and a private receive method (receiveStart/receiveThrow). The Permutation class is just a helper for computing or winner.

public class Coordinator extends Thread {
    private MessageQueue    startMsgs = new UnboundedMessageQueue(),
                            throwMsgs = new UnboundedMessageQueue();
    private Player          player1, player2;

    public Coordinator(Player player1, Player player2) {
        super("Coordinator");
        this.player1 = player1;
        this.player2 = player2;
        setDaemon(true);
    }

    public void run() {
        try {
            for (int count = 0; true; count++) {
                receiveStart();

                player1.play(this);
                player2.play(this);

                Player  first  = recieveThrow(),
                        second = recieveThrow();

                if (isTie(first, second)) {
                    System.out.printf("%4d) %8s: %s%n", count, "TIE", first.getMove());
                } else if (isWinning(first, second)) {
                    System.out.printf("%4d) %8s: %s -> %s%n",
                            count, first.getName(), first.getMove(), second.getMove());
                } else {
                    System.out.printf("%4d) %8s: %s -> %s%n",
                            count, second.getName(), second.getMove(), first.getMove());
                }

                this.startPlay();
            }
        } catch (InterruptedException e) {System.out.println(getName()+" Interrupted");}
    }

    public void     startPlay() throws InterruptedException {
        startMsgs.put(this);
    }
    private void    receiveStart() throws InterruptedException {
        startMsgs.get();
    }

    public void     throwResult(Player p) throws InterruptedException {
        throwMsgs.put(p);
    }
    private Player  recieveThrow() throws InterruptedException {
        return (Player) throwMsgs.get();
    }

    protected boolean isWinning(Player first, Player second) {
        Permutation permutation = new Permutation(first, second);
        return  permutation.dominates(ROCK    , SCISSORS) ||
                permutation.dominates(PAPER   , ROCK)     ||
                permutation.dominates(SCISSORS, PAPER);
    }

    protected boolean isTie(Player first, Player second) {
        return new Permutation(first, second).tie();
    }

    private static class Permutation {
        private Player  first, second;

        private Permutation(Player first, Player second) {
            this.first = first;
            this.second = second;
        }
        public boolean dominates(String m1, String m2) {
            return first.getMove().equals(m1) && second.getMove().equals(m2);
        }
        public boolean tie() {
            return first.getMove().equals( second.getMove() );
        }
    }
}

Source code

My application is developed using Maven, which means it very easy to download and unpack the sources and then type

mvn package
java -jar target\RockPaperScissorsUsingTMI-1.0.jar 60

Links