The critical section problem in Erlang
The programming language Erlang is based on micro-threads and asynchronous message passing. There is a (naive) belief that critical section problems cannot arise in languages based solely on message passing. The justification for this stand-point is the absence of mutex synchronization primitives, which is absolutely essential in shared-data based concurrent languages. As we will shortly see, it’s more about the design of the data operations than what means of thread synchronization is used. But first…
A Little Bit of Theory
Critical Section
A critical section referrers to the code section executed to operate on shared data. Shared data is some state that can be accesses by several threads concurrently. The conditions required for a critical section problem to occur are
- A non-atomic mutating operation. That means an operation consisting of the primitives READ/MODIFY/WRITE and they are not executed as one single indivisable operation.
- Concurrent threads performing the operation.
- Interruption of the threads somewhere within the operation. That means, one thread currently performing the operation is suspended and another starts and completes the operation, before the first thread are allowed to proceed.
Mutex Synchronization
In order to solve the problem (above) one has to violate at least of the conditions. The standard way of doing that is the realization of so called mutual exclusive access to the code section. This is normally solved using some form of lock. Java has a keyword synchronized, which behind the scenes uses POSIX mutex_lock()/unlock() on POSIX compliant systems such as Linux.
The Demo Problem
Consider a back account with multiple updaters performing depositions and withdrawals (yes, this is a classical ;-)
We start the system with 1 account of balance 0, U updaters which all performs 2N updates. First they deposit 100 (choose your currency here) N times and then withdraws 100 N times. The resulting balance should of-course be 0.
Erlang Implementation
Account
Let’s start with the Erlang implementation of the account. The account is represented as a thread (process in Erlang’s parlance).The create function starts a new thread which executes the account body function. The body waits for any of a set of messages. A message is sent using rendezvous (blocking/synchronous) realized using the functions send/2, recv/0 and reply/2.
account_create() ->
{account, spawn(?MODULE, account_body, [0])}.
account_body(Balance) ->
case recv() of
{From, done} ->
reply(Balance, From),
done;
{From, get} ->
reply(Balance, From),
account_body(Balance);
{From, {set, Value}} ->
BalanceNew = Value,
reply(BalanceNew, From),
account_body(BalanceNew);
_ ->
account_body(Balance)
end.
account_get(Account) -> send(get, Account).
account_set(Account, Value) -> send({set,Value}, Account).
account_destroy(Account) -> send(done, Account).
Updater
An updater performs all of its operations and then terminates, notifying the bank about its ‘departure’.
updater_create(Id, Account, NumUpdates, RunMode) ->
{updater, Id, spawn(?MODULE, updater_body, [Id, Account, NumUpdates, RunMode])}.
updater_body(Id, Account, NumUpdates, RunMode) ->
io:format("Updater-~w: started~n", [Id]),
repeat(NumUpdates, fun() -> account_update(Account, +100, RunMode) end),
repeat(NumUpdates, fun() -> account_update(Account, -100, RunMode) end),
bank ! {updater, Id},
io:format("Updater-~w: terminated~n", [Id]).
account_update(Account, Value, unsafe) ->
Balance = account_get(Account), %READ
BalanceNew = Balance + Value, %MODIFY
account_set(Account, BalanceNew); %WRITE
The updater body uses a helper function repeat/2 that takes a positive number and a closure as arguments. The closure performs one function call account_update/3, which is the key to this blog post. The last parameter is the mode of operation. As you can see, currently I only show you the unsafe mode. First it calls get/1 to READ the balance value, then it MODIFY the value and finally it WRITE the value back using set/2, which sends the new balance value to the account thread. Both get/1 and set/2 are rendezvous functions. One might say that a rendezvous creates a mutex operation.
Helpers
Let’s quickly show the helper functions, to get them out of the picture.
send(Msg, {_,To}) ->
To ! {msg, self(), Msg},
receive
{reply, To, Result} -> Result
end.
recv() ->
receive
{msg, From, Msg} -> {From, Msg}
end.
reply(Msg, To) ->
To ! {reply, self(), Msg}.
repeat(0, _Task) ->
ok;
repeat(N, Task) when N > 0 ->
Task(),
repeat(N-1, Task).
Bank
Finally, here is the main function of the bank, that setups the system, runs it and tear it down again.
bank(NumThreads, NumUpdates, RunMode) ->
io:format("Running the bank in ~w mode, with ~w updaters, performing 2*~w updates each~n",
[RunMode, NumThreads, NumUpdates]),
register(bank, self()),
Account = account_create(),
Updaters = updater_create_all(NumThreads, Account, NumUpdates, RunMode),
updater_wait_all(Updaters),
Balance = account_destroy(Account),
unregister(bank),
io:format("Final balance = ~w~n", [Balance]).
updater_create_all(NumThreads, Account, NumUpdates, RunMode) ->
[updater_create(Id, Account, NumUpdates, RunMode) || Id <- lists:seq(1, NumThreads)].
updater_wait_all([]) ->
ok;
updater_wait_all(Updaters) ->
receive
{updater, Id} ->
updater_wait_all( lists:keydelete(Id, 2, Updaters) )
end.
As you can see, the bank first creates the (single) account (object) and then all the updater (threads). The updaters are created using list comprehension, which is sort of a for-loop for functional languages. To the right, it first generates a list of integers (1..N) which is then used as a loop variable for the expression to the left which creates a new updater for each value of Id. After that, the bank waits for all updaters to finish. This is performed using the Erlang equivalent of a join-loop. It waits for a {updater,Id} message, removes it from the list of updaters and breaks the loop when the list is empty. The last task the bank performs before it closes, is destroying the account and reading its final balance value. This value is then printed to the console.
The bank module exports two public functions, run/3 and exe/1. The former performs some type checking and than invokes bank/3. The latter is used as the entry point when invoking the program from the console. I do not show them here, but you can find the complete code last in this post.
Execution
Time for the fun part; running the bank. Below you can find a couple of runs with the same parameter settings.
Unsafe Mode
$ erl -noshell -pa target/beams -run bank exe 5 10000 unsafe Running the bank in unsafe mode, with 5 updaters, performing 2*10000 updates each Updater-1: started Updater-2: started Updater-3: started Updater-4: started Updater-5: started Updater-1: terminated Updater-2: terminated Updater-4: terminated Updater-5: terminated Updater-3: terminated Final balance = -907300 $ erl -noshell -pa target/beams -run bank exe 5 10000 unsafe Running the bank in unsafe mode, with 5 updaters, performing 2*10000 updates each Updater-1: started Updater-2: started Updater-3: started Updater-4: started Updater-5: started Updater-4: terminated Updater-3: terminated Updater-2: terminated Updater-5: terminated Updater-1: terminated Final balance = 343800 $ erl -noshell -pa target/beams -run bank exe 5 10000 unsafe Running the bank in unsafe mode, with 5 updaters, performing 2*10000 updates each Updater-1: started Updater-2: started Updater-3: started Updater-4: started Updater-5: started Updater-5: terminated Updater-4: terminated Updater-3: terminated Updater-1: terminated Updater-2: terminated Final balance = 436800 $
Pretty interesting, hu? Why is this happening? To understand this fully, we must return back to the theory at the top of this blog post. We can see that condition 1 is satisfied, because we perform the READ/MODIFY/WRITE operation in separate steps. Conditions 2 is trivially satisfied as well. How about number 3?
A few words about scheduling
Erlang threads are scheduled using what is referred to as runtime driven cooperative scheduling. Cooperative scheduling means that some party (the user or the runtime system) switch threads at (hopefully) well-chosen points in the execution. The opposite is interrupt-driven preemptive scheduling, where a thread switch literally happens as a “bolt from a clear sky”. Very close to this form of scheduling is multi-core/cpu execution, which shows the same characteristics.This is main reason for introducing atomic locks in computer systems. A language based on cooperative scheduling, such as Erlang, can choose wisely where to switch threads.
In a language based on interrupt-driven scheduling, a thread is allowed to run for at most a fixed time period. There is a clock interrupt that fires at equiv-distant time steps. There is no such thing in Erlang. Instead a thread is allowed to invoke functions a certain amount of times. (To be more precise, N number of reductions, where N typically is 1000-2000).
Because the Erlang system has no understanding about the meaning of our account_update/3 function, it can (and does) switch the updater thread anywhere between each function/reduction within account_update/3, as soon as the thread as used up its current share of reductions.
Safe Mode
Now, when we understand why the problem arises we can move ahead and fix it. There is no such thing as a mutex lock in Erlang (although you can implement a thread type that acts as one), so we have to stick to the (obvious) solution of moving the READ/MODIFY/WRITE logic into the target thread. The account thread receives a message and performs one operation fully, before it moves on and serves the next message. This is our mutex. Below I show the missing code snippets.
account_update(Account, Value, safe) ->
account_update(Account, Value).
account_update(Account, Value) -> send({update,Value}, Account).
account_body(Balance) ->
case recv() of
%. . .
{From, {update, Value}} ->
BalanceNew = Balance + Value,
reply(BalanceNew, From),
account_body(BalanceNew);
%. . .
end.
The ’safe’ version of account_update/3 invokes a new method (account_update/2) which sends the update mesage along with the value to the account thread. Within the account object/thread it now performs the operation in an atomic way, because nothing can interrupt its sequence of operations. Even if the account thread is suspended because it runs out of reduction ticks, no new operations will be started until the account resumes and completes the begun update operation. Nuff talk, let’s see it in action.
$ erl -noshell -pa target/beams -run bank exe 5 10000 safe Running the bank in safe mode, with 5 updaters, performing 2*10000 updates each Updater-1: started Updater-2: started Updater-3: started Updater-4: started Updater-5: started Updater-3: terminated Updater-2: terminated Updater-5: terminated Updater-1: terminated Updater-4: terminated Final balance = 0 $ erl -noshell -pa target/beams -run bank exe 5 10000 safe Running the bank in safe mode, with 5 updaters, performing 2*10000 updates each Updater-1: started Updater-2: started Updater-3: started Updater-4: started Updater-5: started Updater-1: terminated Updater-2: terminated Updater-5: terminated Updater-4: terminated Updater-3: terminated Final balance = 0 $ erl -noshell -pa target/beams -run bank exe 5 10000 safe Running the bank in safe mode, with 5 updaters, performing 2*10000 updates each Updater-1: started Updater-2: started Updater-3: started Updater-4: started Updater-5: started Updater-4: terminated Updater-1: terminated Updater-3: terminated Updater-5: terminated Updater-2: terminated Final balance = 0 $
Well, this is not a proof. But even running with hundreds of updaters doing millions of updates we will end up with the final balance = 0.
$ erl -noshell -pa target/beams -run bank exe 100 1000000 safe Running the bank in safe mode, with 100 updaters, performing 2*1000000 updates each Updater-1: started Updater-2: started ... Updater-99: started Updater-100: started Updater-7: terminated Updater-29: terminated Updater-90: terminated ... Updater-16: terminated Final balance = 0 $
Source Code
You will find a link to the source code below.
Compile it at the command line with
$ erlc bank.erl
or start the Erlang shell and compile/run it there
$ erl 1> c(bank).
Run it from the command line with
erl -noshell -run bank exe 10 1000 unsafe
Where 10 updaters are started doing 2*1000 updates each all in ‘unsafe’ mode. Or, run it inside the Erlang shell with
2> bank:run(10, 1000, unsafe).
Final words
This Erlang demo program is part of the course ware for a course in Erlang Basics I have been developing during the last couple of weeks. My Erlang course is marketed by Informator in Sweden and for a start intended for Ericsson.
If you are interested in attending the course, please don’t hesitate to contact Informator.
Using TMI instead of Actors
Here in Sweden the spring has finally arrived and one of the duties I have to do is switching from winter to summer tires on my car. I did that this morning and while waiting in the garage I was reading the two articles about Actor concurrency in JavaWorld. The first article described the actor semantics and how it was realized in the Erlang programming language and the second article described various ways to realize actors on top of the JVM, such as Scala, Groovy and Java it self.
I wasn’t aware of the multitude of actors implementations in Java, as the second article describe. The author Alex Miller implemented the same problem (Rock-Paper-Scissors) for each actor realization and compared the code and effort. This was interesting reading indeed.
Thread Method Invocation (TMI)
Many years ago, back in 2002 to be exact, I implemented a library for thread method invocation (TMI) in Java. The idea was to provide various message passing techniques to Java, ranging from simple mailboxing to non-deterministic rendezvous. The library is essentially different implementation of message queues.
Together with the library I wrote a (long) tutorial that started from the very beginning with motivation of why synchronized and wait/notify/notifyAll are needed and then step-by-step moved up in the abstraction level via different forms of message passing, ending in non-determinsitic rendezvous, inspired by the Ada programming language. The article was submitted to JavaWorld, but was declined. After that I was busy with so many other things that the idea and the libray went into limbo. Perhaps it might resurrect nowadays.
After having read the second article with its various Rock-Paper-Scissors implementations, I couldn’t resist the temptation to do my own implementation of Rock-Paper-Scissors using TMI. The rest of this post describes my implementation.
Before I continue, let’s state first that TMI is not an actors model. It is close in spirit, becuase it’s a message passing model, but do not honor the value passing semantics. You can pass what ever object you want as a sender argument or as a response object (rendezvous). Still I consider TMI be a very useful concurrency model and simplifies designing thread based Java programs a lot.
Rock-Paper-Scissors using TMI
The main class is simple and straight forward, let’s show it first. It creates two players and a coordinator and kicks the latter to start the game. The program runs for a specified number of seconds.
public class RockPaperScissorsUsingTMI {
public static final String ROCK = "rock";
public static final String PAPER = "paper";
public static final String SCISSORS = "scissors";
public static void main(String[] args) throws InterruptedException {
int runIntervalInSeconds = 20;
if (args.length > 0) runIntervalInSeconds = Integer.parseInt( args[0] );
Player p1 = new Player(1);
Player p2 = new Player(2);
Coordinator c = new Coordinator(p1, p2);
c.start(); p1.start(); p2.start();
c.startPlay();
Thread.sleep(runIntervalInSeconds * 1000);
}
}
Here is a sample output of running the program.
>java -jar RockPaperScissorsUsingTMI-1.0.jar 15 0) Player-1: scissors -> paper 1) Player-1: scissors -> paper 2) Player-1: scissors -> paper 3) Player-1: rock -> scissors 4) Player-2: scissors -> paper 5) Player-2: rock -> scissors 6) TIE: scissors 7) TIE: scissors 8 ) Player-2: rock -> scissors 9) Player-2: scissors -> paper 10) Player-2: rock -> scissors 11) Player-1: rock -> scissors 12) Player-2: rock -> scissors 13) Player-1: scissors -> paper 14) Player-2: rock -> scissors 15) TIE: paper 16) Player-1: rock -> scissors 17) Player-2: paper -> rock 18) TIE: rock >
The Player
The player class below is simple, but illustrates the TMI realization. In this case I have chosen to be close to the actors model and are using an unbounded message queue. The interesting methods are play() and receive() which shows the ’sender’ side and the ‘receiver’ side of a message queue. The sender invokes play(), which inserts a message into the queue. Because the queue is unbouded, this is not a blocking call. It’s easy to change the queue to a rendezvous queue, which results in the sender are blocked until the receiver puts a reply. Inside the run() method, a player recieves the coordinator. The player is blocked as long as the input queue is empty. Then it waits for a random amout of time and performs a move. It responds back to the coordinator using itself as argument.
public class Player extends Thread {
private static final List<String> moves = Arrays.asList(ROCK, PAPER, SCISSORS);
private static final Random r = new Random();
private MessageQueue playMsgs = new UnboundedMessageQueue();
private String move;
public Player(int id) {
super("Player-"+id);
setDaemon(true);
}
public void run() {
try {
while (true) {
Coordinator c = receive();
delay( r.nextInt(1000) );
move = makeMove();
c.throwResult(this);
}
} catch (InterruptedException e) {System.out.println(getName()+" Interrupted");}
}
public void play(Coordinator master) {
try {
playMsgs.put(master);
} catch (InterruptedException e) { }
}
private Coordinator receive() {
try {
return (Coordinator) playMsgs.get();
} catch (InterruptedException e) {return null;}
}
private String makeMove() {
return moves.get( r.nextInt(moves.size()) );
}
public String getMove() {
return move;
}
protected Player setMove(String move) {
this.move = move;
return this;
}
private void delay(int numMilliSecs) {
try {
sleep(10 + numMilliSecs);
} catch (InterruptedException e) { }
}
}
The Coordinator
The coordinator is slightly more complex, due to its responsibilities. It waits for a start message, then ask each player to play and receives the throw messages in either order. It then evalutes who won this round. The concurrency part is not very complicated. It has two unbouded queues, one for each message type. And for each message type it has a public sender method (startPlay/throwResult) and a private receive method (receiveStart/receiveThrow). The Permutation class is just a helper for computing or winner.
public class Coordinator extends Thread {
private MessageQueue startMsgs = new UnboundedMessageQueue(),
throwMsgs = new UnboundedMessageQueue();
private Player player1, player2;
public Coordinator(Player player1, Player player2) {
super("Coordinator");
this.player1 = player1;
this.player2 = player2;
setDaemon(true);
}
public void run() {
try {
for (int count = 0; true; count++) {
receiveStart();
player1.play(this);
player2.play(this);
Player first = recieveThrow(),
second = recieveThrow();
if (isTie(first, second)) {
System.out.printf("%4d) %8s: %s%n", count, "TIE", first.getMove());
} else if (isWinning(first, second)) {
System.out.printf("%4d) %8s: %s -> %s%n",
count, first.getName(), first.getMove(), second.getMove());
} else {
System.out.printf("%4d) %8s: %s -> %s%n",
count, second.getName(), second.getMove(), first.getMove());
}
this.startPlay();
}
} catch (InterruptedException e) {System.out.println(getName()+" Interrupted");}
}
public void startPlay() throws InterruptedException {
startMsgs.put(this);
}
private void receiveStart() throws InterruptedException {
startMsgs.get();
}
public void throwResult(Player p) throws InterruptedException {
throwMsgs.put(p);
}
private Player recieveThrow() throws InterruptedException {
return (Player) throwMsgs.get();
}
protected boolean isWinning(Player first, Player second) {
Permutation permutation = new Permutation(first, second);
return permutation.dominates(ROCK , SCISSORS) ||
permutation.dominates(PAPER , ROCK) ||
permutation.dominates(SCISSORS, PAPER);
}
protected boolean isTie(Player first, Player second) {
return new Permutation(first, second).tie();
}
private static class Permutation {
private Player first, second;
private Permutation(Player first, Player second) {
this.first = first;
this.second = second;
}
public boolean dominates(String m1, String m2) {
return first.getMove().equals(m1) && second.getMove().equals(m2);
}
public boolean tie() {
return first.getMove().equals( second.getMove() );
}
}
}
Source code
My application is developed using Maven, which means it very easy to download and unpack the sources and then type
mvn package java -jar target\RockPaperScissorsUsingTMI-1.0.jar 60

