Towards Ada Style Thread Communication in Java
or How to implement
Thread Method Invocation
Jens Riboe
September
2002
Table of Contents
Problem 1 – Why you need to use synchronized
Problem 2 – Why you need to use wait and notify
Problem 3 – Why you should use notifyAll instead of notify
Problem 4 – Why you should encapsulate a mailbox on the receiving side
Problem 5 – Why you should use bounded queues
Problem 6 – Why you sometimes can’t suspend the sender, when the queue
is full
A small class library for thread message queues
TMI – Thread
Method Invocation
Problem 7 – Why you should consider Thread Method Invocation (TMI)
Non-Deterministic
Message Acceptance
Problem 8 – Why you sometimes must combine TMI with non-determinism
Problem 9 – Why you might need guards when you are using
non-determinism
The Java
platform have introduced threads as a standard tool for solving day to day
programming tasks. Before the Java era, thread programming was something for
the specialists lurking around with POSIX or Win32 Threads in C or C++.
However,
thread based programs may suffer from a complete new category of bugs essentially
different from traditionally bugs. Many of these bugs originate from
misconceptions of how threads communicate.
In this
article, I will show you step by step how to build up a robust and reliable
library for thread communication. The synopsis is problem driven, which means I
will formulated and demonstrate a bug, then analyse it and finally present a
solution.
The article
starts with the very basics, how to send some data from one thread to another.
It then introduces the concept rendezvous taken from the Ada programming
language, and renames it into Thread Method Invocation (TMI). Finally,
it discusses the idea of non-deterministic message acceptance, represented by
the SELECT statement in Ada. All discussions leads into the definition of a
small class library for thread communication, which provides both basic
primitives and ones that are more complex.
This
section discusses and motivates the minimum amount of code needed to transfer a
data message from one thread to another. It ends up into how to proper use the
keyword synchronized and the methods wait, notify and notifyAll. If you already
are very familiar with these, you might want to skim this section and proceed
to the next.
When two
(or more) threads want to interchange data, they need to co-operate about a
common place where one thread can put data and another can get
it. This place is often referred to as a critical region, which loosely
speaking means it’s critical to the correctness of the program execution. To
understand the characteristics of such a region, let’s dive into a model
problem that illustrates the problem and then how to solve it.
Consider a
single bank account object containing an integer balance value. What happens if
many actors update the account concurrently? Here is the class declaration for
the Account class.
class Account {
private int balance = 0;
public Account() {}
public void updateBalance(int amount) {
// balance += amount;
int b = getBalance(); //READ
b = b + amount; //MODIFY
setBalance(b); //WRITE
}
public int getBalance() {return balance;}
private void setBalance(int b) {balance = b;}
}
As you can
see above, I have decomposed the update statement into three statements to make
it clear exactly what is going on. To perform an update account operation we
need to read, modify and write data. These three operations are part of every
kind of update operation, independent of context.

In addition
to the Account class, we need an updater thread. A thread can loosely be viewed
as a separate program, which means, it has its own program statement pointer
and a function call stack. Threads consist of a little bit more than that, but
we leave that out for now.
The
clearest way to define a thread in Java is to extend the java.lang.Thread
class. A thread object in Java behaves like an ordinary object, plus it can
execute its run method concurrently with other threads. Let’s skip the details
of threads and focus on the significant code in the run method of thread class
Updater. Here it is
int amount =
100;
for (int i =
0; i < numTransactions; i++) {
account.updateBalance(amount);
}
for (int i =
0; i < numTransactions; i++) {
account.updateBalance(-amount);
}
As you can
see above, an updater thread increments and decrements the balance of the
account, but the net effect, after completion, is that nothing has changed,
i.e. the balance is still zero. You can view the complete source code, if you
scroll to the end of this article. At the end, you can also find a zip file
containing all source code for this article, plus an Ant build script and
needed jar files.
Now, let’s
run the program and inspect the final balance. I have simplified the command
line and the outputs for clarity. You can find detailed run instructions in the
zip file.
>java
AccountUpdater
NumUpdaters : 5
NumTransactions:
10000
----------------
Final balance
= 0
>java
AccountUpdater -u 10 -t 100000
NumUpdaters : 10
NumTransactions:
100000
----------------
Final balance
= 0
It seems to
be working perfect; end of story. Eh,… not quite. Let’s run it again but this
time load it substantially more.
>java
AccountUpdater -u 10 -t 10000000
NumUpdaters : 10
NumTransactions:
10000000
----------------
Final balance
= 1154011500
>java
AccountUpdater -u 10 -t 10000000
NumUpdaters : 10
NumTransactions:
10000000
----------------
Final balance
= -1652878200
As you can
see above, we clearly have a bug. When the number of transactions is high, the
final balance has a random value. Why?
To
understand the nature of the problem we need to understand how threads are
executing and to recap the read/modify/write operation. A thread
executes as a small program and it proceeds uninterrupted until something
forces it to stop. Besides, of the trivial reason that the program has ended,
it also takes a break when it waits for a disc I/O completion, for example.
However, that doesn’t explain our program, because we don’t have such
operations in it.
Most modern
operating systems and corresponding Java implementations are using so called
pre-emptive scheduling, which means that each thread executes, at maximum, a
specified amount of time. Then it is forced to take a break (suspend) to allow
other threads execute. Eventually, the thread resumes its execution for another
time slice. Either it hits the end of the slice or it performs a suspending
operation, like a write operation to disc. In both cases, it is suspended again.
The end of slice suspension is performed by a timer interrupt. An interrupt can
occur anytime, for example inside a read/modify/write operation.
Let’s trace
the execution of the updateBalance method. I have expanded the method body into
the caller’s for loop, so you can see what’s going on.
for (int i =
0; i < numTransactions; i++) {
// balance += amount;
int b = getBalance(); //READ
b = b + amount; //MODIFY
setBalance(b); //WRITE
}
Pretend
that we have only two updater threads. Thread 1 executes the loop, for example
10,000 times, until it reaches the end of its time slice and thread 2 starts
executing the loop. The end of a slice might appear after the modify statement,
but before the write statement. If that happens, the member variable balance
has the value 1,000,000 but the local variable b has the value 1,000,100.
The
execution now proceeds with thread 2, which increases the balance to, say,
2,000,000. When thread 1 resumes its execution, it overwrites the balance with the
value in the local variable b. Be aware of that all local variables are local
to a thread, i.e., we have one b for each thread. Instead of continuing from
2,000,000, thread 1 now continues from 1,000,100.
One single
interrupt at the “wrong” place completely spoils the result of the program. The
likelihood that an end of a time slice appears after read but before write
is quite low, because there are more statements to execute than the three in
the loop body. Therefore, we need to increase the number of transactions to
more than a million before we can discover any problem.
The
statement read/modify/write is traditionally called a critical (code) section,
because it’s critical for the correctness of the program result. The result
ends up incorrect, because of a timer interrupt, in-between read and write, but
all interrupts outside the critical section are harm less.
Java has a
proper solution to the problem. By declaring the updateBalance method as
synchronized, we have solved the problem. This keyword guarantees that only one
thread at a time can execute inside a synchronized method. If the section is
occupied, all other threads must wait in a queue outside the section.
I have
added a subclass to Account. Here it is
class
SafeAccount extends Account {
public synchronized void
updateBalance(int amount) {
super.updateBalance(amount);
}
}
Running the
demo once more with the latest argument and the new Account class, gives the
output below.
>java
AccountUpdater -u 10 -t 10000000 +s
NumUpdaters : 10
NumTransactions:
10000000
UseSynchronized:
true
----------------
Final balance
= 0
Well, it’s
not a proof, but you have to trust me – it is the solution to the problem. Run
the program with a large number of transactions and threads, and convince
yourself.
Bear in
mind that the execution time will be substantially longer than before. The
reason is that we now have many more thread context switches. Every time a
thread completes an updateBalance operation and leaves the critical section for
the next turn in the loop, the queue is non-empty, which causes the thread to
wait in the queue instead and another thread to enter the section.
Instead of
marking a method as synchronized, it’s possible to use a synchronized block
instead. Such a block takes an object argument, which is then used for
synchronization. Here is an equivalent to the synchronized method above, using
a ditto block instead
public void
updateBalance(int amount) {
synchronized (this) {
super.updateBalance(amount);
}
}
You can use
an arbitrary object as the synchronization object, for example
public void
updateBalance(int amount) {
synchronized (System.out) {
super.updateBalance(amount);
}
}
Sometimes
you must use a synchronized block, but usually you should stick to a
synchronized method instead, because it makes life a little bit simpler. Using
a single global synchronization object, as System.out, means you have exactly
one critical section in your program, which is seldom the case.
Using the
keyword this as the argument to a synchronized block might be correct depending
on the context. Can you see why the usage below is wrong?
class Data
{public int value=0;}
class Updater
extends Thread {
private Data d;
public Updater(Data d) {this.d = d;}
public void run() {
while (true) {
synchronized (this)
{d.value += 100;}
}
}
}
Every
updater thread is using its own synchronization object, instead of a common
one. This is a kind of bug that easily slips through a code review and can be
very difficult to spot later, when the application has been shipped to the
customers. You can easily correct the error, by changing to
synchronized (d) {d.value
+= 100;}
However,
the best solution is to change the Data class to use synchronized methods
instead.
Using
synchronized methods is not a silver bullet. You must also know what is proper
usage. For example, you have a class with synchronized public methods and add a
new public method, but forget to declare it as synchronized. Now you have
created a backdoor into the critical section.
This bug
can be very hard to find, because a class, in general, has many methods where
not necessarily all of them must be synchronized. Therefore, you can’t just
say; make all methods synchronized. Although, you might want to say make all public
methods synchronized.
I’m sorry
to tell you that it doesn’t stop there either. Take a look at this class, which
has all of its methods declared synchronized.
class Account
{
private int balance=0;
public synchronized void setBalance(int b) {balance=b;}
public synchronized int getBalance() {return balance;}
}
The class
declaration looks irrefutable. However, what do you say about its usage?
void performUpdate(Account a) {
a.setBalance(a.getBalance() + 100);
}
The two
methods are synchronized, but not the composite operation read/modify/write.
In fact, this is a reincarnation of our original problem. Therefore, you must
always analyse how a synchronized class should be used. The easy answer is to
skip setters and stick to synchronized getters and updaters. An updater method
takes an argument and performs the complete read/modify/write operation.
Now, you
know a lot about the usage of synchronized. However, we have not solved our
original objective; how to send data from one thread to another. I started this
section with the observation that two threads that wants to communicate must
agree on a common place where one can put data and the other can get data. The
Account class is a typical common place and we can use it as the basis for our
next class a mailbox.
Consider
two threads, Producer and Consumer, which want to communicate using a common
place (mailbox). Let’s rename the last version of class Account and change the
name of the member variable. Here is the new class, called a mailbox
class MailBox
{
private int msg = 0;
public synchronized void put(int msg)
{this.msg=msg;}
public synchronized int get()
{return msg;}
}
The
producer thread sends a stream of 1’s to the mailbox, by calling the method put
and the consumer is receiving these 1’s by calling the method get on the
mailbox object.

Below you
can find the class declarations for both the producer and the consumer
class
Producer extends Thread {
private MailBox out;
private int numMessages;
public Producer(int numMessages, MailBox out) {
this.out = out;
this.numMessages = numMessages;
}
public void run() {
for (int i = 0; i < numMessages;
i++) {
out.put(+1);
}
out.put(-1);
System.out.println("num =
"+numMessages);
}
}
class
Consumer extends Thread {
private MailBox in;
public Consumer(MailBox in) {
this.in = in;
}
public void run() {
int
sum=0, cnt=0, n;
while ((n = in.get()) >= 0) {
sum += n;
cnt++;
}
System.out.println("sum = "
+ sum);
System.out.println("cnt = "
+ cnt);
}
}
If the
producer sends 1000 1’s, I guess that you will expect the value of sum and cnt
in the consumer to be 1000. Right? So, let’s run the program and check. I have
modified the output for clarity. You can find a link to the source code in the
end of this article, where you also can find a build script and run
instructions.
>java
NumberCounter
NumMessages : 1000
----------------
Producer: num
= 1000
Consumer: sum
= 0
Consumer: cnt
= 527361
>java
NumberCounter -m 10000
NumMessages : 10000
----------------
Producer: num
= 10000
Consumer: sum
= 0
Consumer: cnt
= 0
>java
NumberCounter -m 1000000
NumMessages : 1000000
----------------
Producer: num
= 1000000
Consumer: sum
= 899622
Consumer: cnt
= 1424308
Interesting,
eh…? The sum of 1’s is always different from the number of 1’s sent and the
number of turns in the consumer’s loop is always different from the sum of the
1’s. Strange, to say the least.
Look at the
program code once more. The msg member variable is initialised to 0. If the
consumer starts executing, it will read the value 0 from the mailbox several
times. The sum value remains zero, but the cnt value increases. Eventually, the
consumer’s time slice ends and the producer starts executing. For a moderate
number of messages, can the producer complete its for loop during one time
slice and just before termination put the value –1 in the mailbox. The consumer
will then resume, picking up the –1 and break its while loop.
If the
producer starts executing, is it likely it can complete its for loop and leave
a –1 in the mailbox, which then cause the consumer to break its while loop
immediately.
If the
number of messages is large, will the two threads switch a couple times, which
results in a positive value of the sum variable.
Java has,
of course, a solution to this problem. Every class inherits directly or
indirectly from the class java.lang.Object. The Object class contains, among
others, two methods of importance in this context. The wait method suspends the
calling thread and the notify method resumes another thread that has already
been suspended of a previous wait. These methods must be paced inside a
synchronized block (method) else a IllegalMonitorStateException will be thrown.
Nevertheless, how can this solve our problem?
The root
cause of the incorrect result is that the two threads don’t wait for each
other. When the producer has sent one message, it should wait until the
consumer has received it. Moreover, the consumer must wait for the producer to
send the next message. This is in general called, event synchronization.
A thread might wait for an event (or condition) to occur and another thread
might cause the event to occur (notify).
Let’s apply
this insight and implement a subclass to Mailbox that has event
synchronization. The events (or conditions) to wait for are the mailbox is full
and it is empty respectively. The producer calls put and suspends it self if
the mailbox is already full and the consumer suspends it self if the mailbox is
empty when it calls method get. Here is the subclass
class
SafeMailBox extends MailBox {
private boolean isFull = false;
public synchronized void put(int msg) {
while (isFull)
try{ wait(); }catch(InterruptedException
x) {}
super.put(msg);
isFull = true;
notify();
}
public synchronized int get() {
while (!isFull)
try{ wait();
}catch(InterruptedException x) {}
int msg = super.get();
isFull = false;
notify();
return msg;
}
}
At the end
of each method the method, notify is called to notify the other side that its
waiting condition has changed. When a thread resumes after a wait, it rechecks
the condition to be sure. This is a good habit and should always be applied
instead of using a simpler if-statement. If we re-run the program now with the
new mailbox implementation, it works as expected. Try it for your self! You can
download the source code, if you scroll to the end of this article. Here is the
output
>java
NumberCounter -m 1000 +s
NumMessages : 1000
UseWait&Notify:
true
----------------
Producer: num
= 1000
Consumer: sum
= 1000
Consumer: cnt
= 1000
The new
mailbox contains the minimum amount of code needed for two threads to safely
interchange messages. There are variations on the theme, as we will see later
in this article, but no short cuts. Every attempt to leave out wait/notify will
cause a bug.
An object
using synchronized and wait/notify is usually referred to as a monitor,
in concurrency theory. A monitor consists of (shared) data and a set of access
methods. The only way to access the data is to call one of the access methods.
This guarantees that only one thread at a time can be executing inside a
monitor. To better, understand the semantics of a monitor look at the
illustration below.

A monitor (synchronized object) is
using two queues and a lock to realize its semantics. When a thread calls one
of the access methods, the lock is inspected. If it is unlocked, the thread
becomes the lock owner and the monitor is now locked until the thread leaves
the monitor. If the monitor is already locked, is the thread suspended and
inserted into the first queue (enterQ).
A thread executing inside a monitor
might want to check a synchronization event (condition) and if needed suspend
it self until the event occur. This is performed by the wait method in Java and
the thread is inserted into the second queue (waitQ). The monitor is also unlocked,
to allow another thread in enterQ to acquire the monitor.
Eventually,
a thread inside the monitor calls the method notify, which moves one thread
from waitQ to the front of enterQ. Whenever the notifying thread leaves the
monitor and it becomes suspended, because of an end of time slice interrupt or
some kind of suspending operation, the awoken thread resumes its execution
after the wait statement with the monitor re-acquired. The explanation above
contains some simplifications, but serves as a good model for monitors.
Java
implements a so called resume&continue monitor, that means the
notifying thread wakes up a thread from waitQ, but continues executing inside
the monitor. The alternative is a resume&wait monitor, were the
notifying thread becomes suspended and the awoken thread starts executing
inside the monitor. A third alternative is a so-called implicit resume
monitor. This variant lacks wait and notify primitives. Instead, it has
entrance conditions (called guards) and re-checks them every time a thread
leaves the monitor. This variant is an important concurrency building block in
the real-time programming language Ada95.
Now, you
know how to set up a communication link between two threads, but not between
more than two! Follow on to the next problem to understand why.
Let’s
modify our latest program a little bit, so we allow more than one producer and
let each of them send the sequence 1,2…n. The (single) consumer adds all
numbers and prints out the result when the program terminates. If we have p
number of threads, should the result be p * n * (n+1) / 2.
I’m using
the same mailbox as before, slightly modified. Here it is.
class MailBox
{
protected int data = 0;
protected boolean isFull = false;
public synchronized void put(int msg) {
while (isFull)
try{ wait();
}catch(InterruptedException x) {}
data
= msg;
isFull = true;
doNotify();
}
public synchronized int get() {
while (!isFull)
try{ wait();
}catch(InterruptedException x) {}
int msg = data;
isFull = false;
doNotify();
return msg;
}
protected void doNotify() {notify();}
}
Here is the
producer and consumer thread classes. I have simplified the code for clarity.
You can find a link to the complete source code at the end of this article.
class
Producer extends Thread {
private MailBox out;
private int numMessages;
public Producer(int numMessages, MailBox
out) {
this.out = out;
this.numMessages = numMessages;
}
public void run() {
for (int i = 1; i <= numMessages;
i++) {
out.put(i);
}
}
}
class
Consumer extends Thread {
private MailBox in;
private int sum = 0;
public Consumer(MailBox in) {
this.in = in;
}
public void run() {
int n;
while ((n = in.get()) >= 0) {
sum += n;
}
System.out.println("sum = "
+ sum);
}
}
The main
program is responsible for creating a mailbox, the consumer, and a number of
producers. It then waits for the completion of all producers, sends a –1 via
the mailbox to the consumer and waits for the completion of the consumer. Now,
let’s run the program with two producers and see what happens.
1)
>java
NumberSequencer +v -p 2
2)
NumProducers:
2
3)
Max : 10
4)
UseNotifyAll:
false
5)
Consumer:
Started
6)
Consumer:
Mailbox<data=0, isFull=false> get(): enter
7)
Consumer:
Mailbox<data=0, isFull=false> get(): before wait
8)
Producer-1:
Started
9)
Producer-1:
OUT 1
10)Producer-1: Mailbox<data=0,
isFull=false> put(msg=1) enter
11)Producer-1: Mailbox<data=1,
isFull=true> put(msg=1) before notify
12)Producer-1: Mailbox<data=1,
isFull=true> put(msg=1) exit
13)Producer-1: OUT 2
14)Producer-1: Mailbox<data=1,
isFull=true> put(msg=2) enter
15)Producer-2: Started
16)Producer-2: OUT 1
17)Producer-1: Mailbox<data=1,
isFull=true> put(msg=2) before wait
18)Consumer: Mailbox<data=1,
isFull=true> get(): after wait
19)Consumer: Mailbox<data=1, isFull=false>
get(): before notify
20)Consumer: Mailbox<data=1,
isFull=false> get(): exit
21)Producer-2: Mailbox<data=1,
isFull=false> put(msg=1) enter
22)Producer-2: Mailbox<data=1,
isFull=true> put(msg=1) before notify
23)Producer-2: Mailbox<data=1,
isFull=true> put(msg=1) exit
24)Producer-2: OUT 2
25)Producer-1: Mailbox<data=1,
isFull=true> put(msg=2) after wait
26)Producer-1: Mailbox<data=1,
isFull=true> put(msg=2) before wait
27)Producer-2: Mailbox<data=1,
isFull=true> put(msg=2) enter
28)Consumer: IN 1
29)Producer-2: Mailbox<data=1, isFull=true>
put(msg=2) before wait
30)Consumer: Mailbox<data=1,
isFull=true> get(): enter
31)Consumer: Mailbox<data=1,
isFull=false> get(): before notify
32)Consumer: Mailbox<data=1,
isFull=false> get(): exit
33)Consumer: IN 1
34)Consumer: Mailbox<data=1,
isFull=false> get(): enter
35)Consumer: Mailbox<data=1,
isFull=false> get(): before wait
36)Producer-1: Mailbox<data=1,
isFull=false> put(msg=2) after wait
37)Producer-1: Mailbox<data=2,
isFull=true> put(msg=2) before notify
38)Producer-1: Mailbox<data=2,
isFull=true> put(msg=2) exit
39)Producer-1: OUT 3
40)Producer-2: Mailbox<data=2,
isFull=true> put(msg=2) after wait
41)Producer-2: Mailbox<data=2,
isFull=true> put(msg=2) before wait
42)Producer-1: Mailbox<data=2,
isFull=true> put(msg=3) enter
43)Producer-1: Mailbox<data=2,
isFull=true> put(msg=3) before wait
If you try
it yourself, you will see a similar sequence as above and then nothing. The
program seems to always hang. What has happened?
Recap our
model of a monitor previously. Let’s trace the execution from the system state
that both producers are waiting in waitQ and the consumer enters the monitor
(line 30 in the listing above and picture 1 below).

The
consumer proceeds out of the monitor, receives the value 1 and notifies the
first thread in waitQ which is producer 1. The consumer manages to re-enter the
monitor, but finds it empty and calls wait (line 35 in the listing above). This
allows producer 1 to resume its execution after wait (line 36 and figure 2).
Producer 1
inserts value 2 into the mailbox and notifies the first thread (P2) in waitQ
(line 37). It then leaves the monitor (line 38). This allows producer 2 wake up
inside the monitor, after wait (line 40 and figure 3). However, it discovers
that the mailbox is full and goes to sleep again (line 41). Producer 1 enters the
mailbox again, because it wants to send the value 3. Unfortunately, the mailbox
is still full and producer 1 goes to sleep too (line 43 and figure 4). This
system state now remains forever…

The problem
in this scenario is that if the wrong thread is resumed from waitQ it will go
to sleep again, without giving another thread a chance to continue, for example
a consumer.
Luckily,
Java has a solution to this problem too. We can use the method notifyAll
instead of notify. This method resumes all threads in waitQ and allows them to
enter the monitor one at a time. Let’s make a subclass to mailbox, which fixes
this problem. Here it is
class
SafeMailBox extends MailBox {
protected void doNotify() {notifyAll();}
}
Running the
program once again, now with a mailbox that notifies all threads in waitQ,
gives the expected result.
>java
NumberSequencer -v -p 2 +s
NumProducers:
2
Max : 10
UseNotifyAll:
true
Consumer:
Started
Producer-1:
Started
Producer-1:
Terminated
Producer-2:
Started
Producer-2:
Terminated
Consumer:
Terminated
----------------
Consumer: sum
= 110
Expected sum = 110
>java
NumberSequencer -v -p 20 +s
NumProducers:
20
Max : 10
UseNotifyAll:
true
----------------
Consumer: sum
= 1100
Expected sum = 1100
>java NumberSequencer
-v -p 100 +s -m
NumProducers:
100
Max : 100
UseNotifyAll:
true
----------------
Consumer: sum
= 505000
Expected sum = 505000
Finally, at
last, we have a robust component for inter-thread communication. I show you the
result of our mailbox below. I took the freedom to change the message type to
Object, making it a little more versatile.
class MailBox
{
protected Object data = null;
protected boolean isFull = false;
public synchronized void put(Object msg) {
while (isFull == true)
try{ wait();
}catch(InterruptedException x) {}
data
= msg;
isFull = true;
notifyAll();
}
public synchronized Object get() {
while (isFull == false)
try{ wait();
}catch(InterruptedException x) {}
Object msg = data;
isFull = false;
notifyAll();
return msg;
}
}
Our fourth
problem is not a problem in the same sense as the previous three. It’s more a
matter of convenience than incorrectness. However, going for a complicated
solution, instead of a simpler one, might result in a bug.
Let’s study
a slightly more complicated situation than before. Instead of a single
communication pair, we have long pipeline of communicating threads. A producer
sends a message to a transformer, which does something and forwards it to
another transformer, and eventually the pipeline ends with a consumer that
receives the result. Setting up this pipeline requires management of both
threads of different flavours and mailboxes in-between every thread pair. It’s
cumbersome, to say the least, but also inflexible and tedious.
![]()
The
solution is to move the mailbox into the receiving side, as a member variable,
and supply a public method that takes care of the details. Here is an example
illustrating the general idea.
class
Consumer extends Thread {
private Mailbox inbox = new Mailbox();
public void send(int msg) {inbox.put( new Integer(msg) );}
private int receive() {return ((Integer)inbox.get()).intValue();}
public void run() {
int n;
while ((n = receive()) >= 0) {...}
}
}
class
Producer extends Thread {
private Consumer next;
public Producer(Consumer c) {next=c;}
public void run() {
for (...) { next.send(k); }
}
}
The advantages of this arrangement
are 1) the receiving side has encapsulated the synchronization logic, including
packing and unpacking of envelop objects and 2) the sending side calls the
other thread with a commonly known method invocation semantics. If we later
want to change the mailbox type into another kind of object (see the next
sections in this article), it’s completely transparent for the sending side.
The send
method of the consumer acts as a rendezvous point between a communicating
thread pair. It is important to observe that the method is executed in the
context of the sending side, i.e. not in the consumer thread’s context.
The pipeline
problem mentioned above requires two kinds of receivable threads; a consumer
and a transformer. This calls for an abstraction manifested by an interface and
an abstract implementing class.
interface
Receivable {
void
send(int msg);
}
abstract
class AbstractReceivable
extends Thread
implements Receivable
{
private Mailbox inbox = new Mailbox();
public void send(int msg) {inbox.put(new Integer(msg));}
protected int receive() {return ((Integer)inbox.get()).intValue();}
}
By
extending the abstract class, we can define the two receivables. I show all
three thread classes below, plus a method that assembles a pipeline.
class
Consumer extends AbstractReceivable {
public void run() {
int msg;
while ((msg = receive()) >= 0) {
System.out.println(“Consumer:
”+msg);
}
}
}
class
Transformer extends AbstractReceivable {
private Receivable next;
public Transformer(Receivable n) {next=n;}
public void run() {
int msg;
while ((msg = receive()) >= 0) {
next.send(2 * msg);
}
next.send(-1);
}
}
class
Producer extends Thread {
private Receivable next;
private int N;
public Producer(int N, Receivable r)
{this.N=N; next=r;}
public void run() {
for (int i=1; i<=N; ++i) {
next.send(i);
}
next.send(-1);
}
}
class
DoublerPipeline {
public void start(int numTransformers, int N) {
Receivable r = new Consumer();
for (int i=1; i<=numTransformers; ++i) {
r = new Transformer(r);
}
Producer p = new Producer(N, r);
}
}
Here is a
printout from running the program, with 10 transformers (doublers) and the
producer sending the values 1,2…n.
>java
DoublerPipeline -d 10 -m 100
NumDoublers:
10
Max : 100
----------------
Consumer:
Last received = 102400
Expected last received = 102400
The
benefits of moving the synchronization logic into the receiving side should not
be underestimated. Thread communication becomes as simple as object
communication. Below I show you a program that lists all prime numbers between
1 and N. The algorithm is very inefficient if you are truly interested in finding
prime numbers, however it illustrates the threading concept elegantly. It’s a
good benchmark too for the threading performance of a language or thread
library and I have implemented it in many different languages and libraries
over the years.

The program
idea builds upon the classical algorithm using a chain of prime number sieves.
Every sieve is a thread, and the program builds up a chain that gets longer for
every prime number found. The main program sends the number sequence 3,4…n to
the first prime number sieve representing number 2.
A sieve
thread receives a number and checks if the number is divisible with the prime
number of the sieve. The number is sent to the next sieve in the chain if it is
not divisible. A new sieve thread is created the first time this happens. The
first thing a new sieve does is it prints out the prime number it represents.
public class
PrimeNumbers {
public static void main(String[] args) {.
. . }
public void start(int max) {
PrimeNumberSieve next = new PrimeNumberSieve(1, 2);
for (int i=3; i<=max; ++i) {
next.put(i);
}
next.put(-1);
}
private static class PrimeNumberSieve
extends Thread {
private MailBox inbox = new MailBox();
private PrimeNumberSieve next
= null;
private int order;
private int prime;
public PrimeNumberSieve(int n, int
prime) {
this.order = n;
this.prime = prime;
start();
}
public void run() {
System.out.println(order + ":
" + prime);
int n;
while ((n = get()) > 0) {
if (n % prime != 0) {
if (next == null)
next = new
PrimeNumberSieve(order+1, n);
else
next.put(n);
}
}
if (next != null) next.put(-1);
}
public void put(int n) {inbox.put(n);}
private int get() {return inbox.get();}
}
Here is a
printout of one execution of the program, listing all prime numbers below 1000.
>java
PrimeNumbers -m 1000
1: 2
2: 3
3: 5
4: 7
5: 11
6: 13
. . .
165: 977
166: 983
167: 991
168: 997
In the
listing above, you can see the program have created 168 threads. That means the
last numbers travelled through 168 thread context switches to the end of the
chain. Moreover, every thread must wait for the next thread to receive a sent
number before it can send the next number. In this hobby program, it doesn’t
matter, but for a real program you might have a chain of different threads with
different processing time. That means in clear text, the slowest thread in the
chain slows down the whole chain’s processing time.
If the different
times are fixed, it’s no problem, because the chain can’t operate faster. On
the other hand, if the times vary it’s inefficient to always wait for the
slowest thread in the chain. This problem adds a new requirement on our mailbox
implementation. Hang on to the next section where this problem will be
addressed.
A drawback
with a single slot buffer, a.k.a. mailbox, is that the data transfer speed is
limited to the processing time of an intermediary thread, because only one data
message at time can be sent between a pair of threads. A solution that adds
more flexibility is a multi slot buffer, which can even out differences in
processing time between different threads and over time.
This
section discusses how to proper implement such a buffer or, with a better term,
a message queue.
The most
obvious representation of a message queue is a linked list
(java.util.LinkedList). Below I show you a straightforward implementation. The
most important observation is that the queue lacks a call of wait in the put
method, because it’s not needed.
class
UnboundedQueue {
private LinkedList theQueue = new LinkedList();
public synchronized void put(Object msg) {
theQueue.addLast(msg);
notifyAll();
}
public synchronized Object get() {
while (theQueue.isEmpty())
try { wait(); } catch
(InterruptedException x) {}
Object msg = theQueue.removeFirst();
return msg;
}
}
The queue
is a member of a consumer thread, which receives block of bytes representing
data messages. A producer creates such blocks and just sends them to the
consumer. I don’t bother to show you the rest of the program; you should be
quite familiar with producers and consumers, so far reading this article. You
can find the complete source code at the end of this article.
Let’s run
the program and study the output. The producer sends 10000 blocks of size 10000
bytes each, to the consumer. A special watcher thread prints some measures once
a second.
>java
TimedPipeline -s 10000
Max = 10000 message
BlockSize = 10000 bytes
1) Consumer
queue size = 163, Memory alloc/total/max = 1/3/128 Mb
2) Consumer
queue size = 328, Memory alloc/total/max = 3/5/128 Mb
3) Consumer
queue size = 501, Memory alloc/total/max = 5/5/128 Mb
4) Consumer
queue size = 662, Memory alloc/total/max = 6/9/128 Mb
5) Consumer
queue size = 836, Memory alloc/total/max = 8/9/128 Mb
. . .
35) Consumer
queue size = 5975, Memory alloc/total/max = 61/63/128 Mb
36) Consumer
queue size = 6070, Memory alloc/total/max = 58/63/128 Mb
37) Consumer
queue size = 6246, Memory alloc/total/max = 60/63/128 Mb
38) Consumer
queue size = 6426, Memory alloc/total/max = 62/63/128 Mb
Got
java.lang.OutOfMemoryError
Consumer
queue size = 6565, Memory alloc/total/max = 63/63/128 Mb
As you can
see above, the program has crashed after 38 seconds, because the memory is
exhausted. Why?
Well, the
data block size is perhaps unrealistic large, and the JVM settings could have
been adjusted to allow a larger heap. In addition, if you read the source code
you will also find a sleep statement in the consumer, which means the consumer
sleep 50 ms after it has consumed a data block. That means the producer and the
consumer doesn’t work with the same processing speed. Consequently grows the
message queue larger and larger until the memory is exhausted.
If we
adjust the message size to something smaller and increase the JVM heap, the
program will run for a longer time. Perhaps several hours or days, but
eventually it will crash due to the same reason as above. On my machine, it
took 47 minutes until crash, with a message size of 100 bytes.
The root
cause is the difference in processing time between the producer and the
consumer. In this demo program it’s a fixed sleep time, but in a real
application the processing time has a natural variation and is a consequence of
different tasks performed.
If the
application, using an unbounded buffer, is a 24/7 application, it can be quite
unpleasant if it crashes once a week. What we have is a resource (memory) leak,
similar to what is common in C/C++ programs.
Unbounded
message queues as the basic primitive for thread communication can be a burden
for a programming language. The real-time programming language Erlang,
developed by Ericsson in the beginning of the 90’s, suffered from that problem.
Erlang have many similarities with Java, except the syntax, which is derived
from Prolog. As a response to many reports on memory leaks, they eventually
developed a platform on top of the language, with bounded communication as one
of its primary objectives.
We can
easily solve the problem, by re-introducing a call to wait in the put method.
The new kind of buffer is a bounded message queue. A producer that calls put
will be suspended if the queue is full. Our previous mailbox is a single slot
queue.
There are
several ways to realize a bounded queue. The simplest is to use the same
implementation as above and add logic to compare the queue size against a
predefined max size and call wait if needed.
public
synchronized void put(Object msg) {
while (theQueue.size() >= maxSize)
try { wait(); } catch
(InterruptedException x) {}
theQueue.addLast(msg);
notifyAll();
}
The
drawback might be a slight performance reduction; compared to the alternative
implementation I will show you below. In addition, this alternative opens up
some extensions I will discuss later in this article.
class
BoundedQueue {
private Object[] theQueue;
private int putIdx = 0, getIdx = 0, size = 0;
public BoundedQueue(int nSlots) {
theQueue = new Object[nSlots];
}
public synchronized void put(Object msg) {
while (isFull())
try{ wait(); }catch(InterruptedException e) {}
insert(msg);
notifyAll();
}
public synchronized Object get() {
while (isEmpty())
try{ wait();
}catch(InterruptedException e) {}
Object msg = remove();
notifyAll();
return msg;
}
private boolean isFull() {return size >= theQueue.length;}
private boolean isEmpty() {return size ==
0;}
private void insert(Object msg) {
theQueue[putIdx++] = msg;
putIdx = putIdx % theQueue.length;
size++;
}
private Object remove() {
Object msg = theQueue[getIdx++];
getIdx = getIdx % theQueue.length;
size--;
return msg;
}
}
Running the
program again, now with a bounded buffer instead gives this output.
>java
TimedPipeline -s 10000 +b
Max = 10000 message
BlockSize = 10000 bytes
QueueSize = 32 slots
BoundedQueue
= true
1) Consumer
queue size = 32, Memory alloc/total/max = 0/1/128 Mb
2) Consumer
queue size = 32, Memory alloc/total/max = 0/1/128 Mb
3) Consumer
queue size = 32, Memory alloc/total/max = 0/1/128 Mb
4) Consumer
queue size = 32, Memory alloc/total/max = 0/1/128 Mb
5) Consumer
queue size = 32, Memory alloc/total/max = 0/1/128 Mb
. . .
506) Consumer
queue size = 32, Memory alloc/total/max = 1/1/128 Mb
507) Consumer
queue size = 32, Memory alloc/total/max = 0/1/128 Mb
508) Consumer
queue size = 32, Memory alloc/total/max = 0/1/128 Mb
509) Consumer
queue size = 32, Memory alloc/total/max = 0/1/128 Mb
Producer
terminated
510) Consumer
queue size = 21, Memory alloc/total/max = 0/1/128 Mb
511) Consumer
queue size = 1, Memory alloc/total/max = 0/1/128 Mb
Consumer
terminated
This
problem illustrates clearly, that you should go for a bounded message queue,
whenever you want two or more threads to communicate. The reason is that if you
have an unbounded flow of messages you must regulate that flow, so it don’t
exhaust the number of available queue slots. On the other hand, if you do have
a bounded flow of messages. That means, if there is a well-defined upper bound
of the number of outstanding messages, you can use an unbounded queue instead.
Using a
bounded message queue solves most of your thread communication demands, except
for some real-time system situations I will discuss next. Follow me on to the
next section.
Pretend
that your application is a life-sustaining system, monitoring heartbeats and
reacting to changes in the rhythm. There might be a producer reading a medical
device and sending data messages to a consumer that interprets the information
and reacts to it. If the message queue becomes full and the producer suspended,
important information might get lost. In a critical moment, the last seconds of
heartbeats are more important than those before.
Another,
less extreme, example is when the producer is not a thread, but an interrupt
routine. In Java, you can use the Java Communications API to read and write
over serial and parallel connections, or you can handwrite something else in
C/C++ using Java Native Interface (JNI). An interrupt routine in C might invoke
a callback method in Java, using JNI. This callback method can then insert a
message into a queue. The problem here is that you can’t easily suspend the
interrupt routine or callback method, without resorting to disable the
interrupt, which might be cumbersome or inappropriate.
The two
problem statements above, calls for a solution were it’s possible to throw away
old messages in the queue in favour of new ones, instead of suspending the
producer side.
However,
what happens if we really want old messages in the queue and it’s possible to
re-send new messages, due to some application level timeout mechanism? Well, this
calls for a variant solution, were new messages are rejected instead.
The
solution is three different implementations of the put method, realised in a
base class (bounded queue) and two sub-classes (skip new and skip old messages
respectively). I show you the three put methods below and direct you to the
source code if you want to read all of it.
public
synchronized void put(Object msg) {
performPut(msg);
notifyAll();
}
//Suspend
sender – class BoundedQueue
protected
void performPut(Object msg) throws InterruptedException {
while (isFull())
wait();
insert(msg);
size++;
}
//Skip old
messages – class BoundedQueueSkipOld
protected
void performPut(Object msg) throws InterruptedException {
if (isFull()) {
getIdx = next(getIdx); //move the get
index (overwrite)
} else {
size++;
}
insert(msg);
}
//Skip new
messages – class BoundedQueueSkipNew
protected
void performPut(Object msg) throws InterruptedException {
if (isFull()) return; //skip msg if it’s
full
insert(msg);
size++;
}
//Helper
methods
public
synchronized isFull() {
return size >= queue.length;
}
protected
void insert(Object msg) {
queue[putIdx] = msg;
putIdx = next(putIdx);
}
protected int
next(int idx) {
return (idx + 1) % queue.length;
}
The
discussion in this section naturally leads into a class hierarchy representing
the different flavours of message queues. You can find a class diagram below,
which shows the structure of a small class library for thread message queues.
The basic abstraction is represented by an interface (MessageQueue) with an
abstract sub-class realizing the interface and implementing common
functionality and template methods. The abstract class is then extended into
two classes, one realizing an unbounded queue and the other a bounded queue.
The bounded queue is further specialized into two classes that differ in how
they treat messages when the queue is full; either skip new messages or skip
old messages. You can find the Java code of the library below, after the
diagram.

public
interface MessageQueue {
public void put(Object msg) throws InterruptedException;
public Object get() throws InterruptedException;
public boolean isEmpty();
public boolean isFull();
public int getSize();
}
public
abstract class AbstractMessageQueue implements MessageQueue {
public synchronized void
put(Object msg) throws InterruptedException {
performPut(msg);
notifyAll();
}
public synchronized Object get()
throws InterruptedException {
Object msg = performGet();
notifyAll();
return msg;
}
public synchronized int getSize() {return performSize();}
public synchronized boolean isEmpty()
{return (getSize() == 0);}
public synchronized boolean isFull() {return false;}
protected abstract void
performPut(Object msg) throws InterruptedException;
protected abstract Object performGet() throws InterruptedException;
protected abstract int performSize();
}
public class
UnboundedMessageQueue extends AbstractMessageQueue {
private LinkedList queue = new
LinkedList();
public UnboundedMessageQueue() {}
protected void performPut(Object msg) throws InterruptedException {
queue.addLast(msg);
}
protected Object performGet() throws
InterruptedException {
while (queue.isEmpty()) wait();
return queue.removeFirst();
}
protected int performSize() {return
theQueue.size();}
}
public class
BoundedMessageQueue extends AbstractMessageQueue {
protected Object[] queue;
protected int putIdx = 0, getIdx = 0, size = 0;
public BoundedMessageQueue() {this(32); }
public BoundedMessageQueue(int n) {queue =
new Object[n];}
protected void performPut(Object msg)
throws InterruptedException {
while (isFull()) wait();
insert(msg);
size++;
}
protected Object performGet() throws
InterruptedException {
while (isEmpty()) wait();
Object msg = remove();
size--;
return msg;
}
protected void insert(Object msg) {
queue[putIdx] = msg;
putIdx = next(putIdx);
}
protected Object remove() {
Object msg = queue[getIdx];
getIdx = next(getIdx);
return msg;
}
public synchronized boolean isFull()
{return size >= queue.length;}
protected int performSize() {return size; }
protected int next(int idx) {return (idx+1) % queue.length;}
}
public class BoundedMessageQueueSkipOld extends
BoundedMessageQueue {
public
BoundedMessageQueueOverwriteOld()
{}
public BoundedMessageQueueOverwriteOld(int
n) {super(n);}
protected void performPut(Object msg)
throws InterruptedException {
if (isFull()) {
getIdx = next(getIdx);
} else {
size++;
}
insert(msg);
}
}
public class
BoundedMessageQueueSkipNew extends BoundedMessageQueue {
public BoundedMessageQueueRejectNew() {}
public BoundedMessageQueueRejectNew(int n)
{super(n);}
protected void performPut(Object msg)
throws InterruptedException {
if (isFull()) return;
insert(msg);
size++;
}
}
This
section introduces the concept Thread Method Invocation (TMI), which is
something in-between local method invocation (LMI) and remote method invocation
(RMI).
LMI is the
normal semantics when a method of an object is called. RMI on the other hand,
means a method of an object in another address space is called. LMI is
performed inside the context (function call stack) of one thread, but TMI is
performed in two different contexts.
All thread
programs so far in this article have been using a push model of thread
communication. That means the calling thread is sending something to the thread
being called. However, how can we go the other way around? That means the
caller receives data back instead. Moreover, how can the calling thread both
send and receive data?
This
semantics is well known if we are speaking about local method invocation, but
also remote method invocation (RMI).
Pretend
that we need a computation thread responsible for computing the Fibonacci
number. A Fibonacci number is defined as
![]()
It would be
very convenient if we can invoke the thread using the well-known local method
invocation semantics, instead of hassle with double mailboxes or queues. Here
is a code fragment showing what I mean
FibonacciServer fib
= new FibonacciServer();
fib.start();
int result = fib.compute(10);
The body of
the compute method is executed in the context of the server and not in the
context of the calling thread. This very simplified example shows how data is
sent to and received from a thread.
TMI is the
basic mechanism for thread communication in the programming language Ada. In
Ada is a thread called a task and TMI is called rendezvous.
It’s not
obvious how to realize TMI, because we can’t use a message queue in the same
way we have studied so far, because the data transfer is one-way only. The
reply message can’t be stored in a member variable of the queue, because there
might be several threads calling the server, each one suspended and waiting in
the queue. We must be able to resume a specific sender thread whenever a reply
message has been computed based on data taken from the sender’s message.
The
solution is to introduce a wrapper message that contains both a sender’s
message and a single slot queue, were the sender thread can wait for the reply
message. Have a look at the figure below.

The calling
thread (client) creates a wrapper and stores the message in it and calls then
waitForReplyMessage, which suspends it. Eventually, the server pulls out the
wrapper from the server’s message queue and extracts the sender’s message by
calling getSenderMessage. When the server has finished its computation, it
calls putReplyMessage, which inserts a reply message in the reply queue of the
wrapper. This resumes the client and it receives the reply message and
continues its execution. The message queue of the server contains wrapper
messages, one for each waiting sender thread.

Let me
start with the implementation first and then after that show the code of the
fibonacci server example. Here is first the class RendezvousMessageQueue
containing the inner class MessageWrapper.
class
RendezvousMessageQueue implements MessageQueue {
private MessageQueue queue = new
UnboundedMessageQueue();
public RendezvousMessageQueue() {}
public Object put(Object msg) throws InterruptedException {
MessageWrapper wrapper = new MessageWrapper(msg);
queue.put(wrapper);
return wrapper.waitForReplyMessage();
}
public Object get() throws InterruptedException {
return queue.get();
}
public boolean isEmpty() {return
queue.isEmpty();}
public boolean isFull() {return false; }
public int getSize() {return queue.getSize();}
public class MessageWrapper {
private Object msg;
private MessageQueue replyQ = new BoundedMessageQueue(1);
protected MessageWrapper(Object msg)
{this.msg = msg; }
public Object getSenderMessage() {return msg; }
public void putReplyMessage(Object msg) throws InterruptedException {
replyQ.put(msg);
}
public Object waitForReplyMessage()
throws InterruptedException {
return replyQ.get();
}
}
}
You might
perhaps ask why I’m using an unbounded queue inside the RendezvousMessageQueue,
viewed in the light of what I have said in this article about the danger of
using unbounded queues. The reason is that the queue can’t be longer than the
number of threads in the program, it therefore exist an upper bound of the
queue size.
Using a
RendezvousMessageQueue it’s straight forward to implement our fibonacci server.
Below I show you the server and a simple client. As always, you can find the
complete source code to this program and all other programs at the end of this
article.
class
FibonacciServer extends Thread {
private RendezvousMessageQueue inbox = new RendezvousMessageQueue();
public FibonacciServer() {. . .}
public int compute(int n) throws InterruptedException {
return ( (Integer)inbox.put( new
Integer(n) ) ).intValue();
}
public void run() {
while (true) {
RendezvousMessageQueue.MessageWrapper
w =
(RendezvousMessageQueue.MessageWrapper)
inbox.get();
int n = ( (Integer)w.getSenderMessage() ).intValue();
int f = fib(n);
w.putReplyMessage( new Integer(f)
);
}
}
private int fib(int n) {
return (n <= 2 ? 1 : fib(n-1) +
fib(n-2));
}
}
class Client
extends Thread {
private FibonacciServer server;
private int n;
public Client(int id, FibonacciServer
server, int n) {
super("Client-"+id);
this.server = server;
this.n = n;
}
public void run() {
for (int i = 1; i <= n; i++) {
System.out.println(getName()
+ ": fib("+i+")
= " + server.compute(i));
}
}
}
Running the
program with one client gives this output.
>java Fibonacci -c 1 -n 30
Client-1:
fib(1) = 1
Client-1:
fib(2) = 1
Client-1:
fib(3) = 2
Client-1:
fib(4) = 3
Client-1:
fib(5) = 5
Client-1:
fib(6) = 8
Client-1:
fib(7) = 13
Client-1:
fib(8) = 21
Client-1:
fib(9) = 34
Client-1:
fib(10) = 55
. . .
Client-1:
fib(27) = 196418
Client-1:
fib(28) = 317811
Client-1:
fib(29) = 514229
Client-1:
fib(30) = 832040
Here is the
excerpt from another run, where the number of clients is 100.
>java Fibonacci -c 100 -n 30
Client-2:
fib(1) = 1
Client-13:
fib(1) = 1
Client-3:
fib(1) = 1
Client-2:
fib(2) = 1
Client-4:
fib(1) = 1
Client-5:
fib(1) = 1
Client-6:
fib(1) = 1
Client-13:
fib(2) = 1
Client-7:
fib(1) = 1
Client-8:
fib(1) = 1
Client-10:
fib(1) = 1
Client-3:
fib(2) = 1
. . .
Client-16:
fib(29) = 514229
Client-80:
fib(29) = 514229
Client-90:
fib(28) = 317811
Client-100:
fib(30) = 832040
Client-16:
fib(30) = 832040
Client-80:
fib(30) = 832040
Client-90:
fib(29) = 514229
Client-93:
fib(30) = 832040
Client-90:
fib(30) = 832040
We can now
extend the class diagram of our library with two new classes; the rendezvous
queue with its wrapper class. The updated class diagram is shown below.

In this
section we have motivated and implemented thread method invocation (TMI) or
rendezvous in Ada parlance. That means it’s now possible to declare a thread
method and invoke it from another thread using the well-known method call
semantics.
However, if
we want two or more thread methods and we in addition want to receive messages
from both of them at the same time, there is no simple solution. Follow me on
to the next section, were I will address that problem.
One of the
biggest advantages with thread method invocation (TMI) is type safety. The
parameter type (or types) together with a return type, determines what kind of
data can be sent to the recipient side. In addition, every violation will be
detected by the compiler instead of slipping through as a run-time bug.
This is all
good, until we consider more than one thread method. Let’s pretend we have
small life-sustaining system, were one thread (HeartSampler) samples the
heartbeats and another one (BreathingSampler) samples the breathing rate. These
two threads send their data to the Merger thread, for further processing. The
public interface of the Merger might be viewed as this
interface
Merger {
void
heartBeat(float amplitude);
void
breathing(float pressure, float carbonDioxide);
}
I’m not a
medical specialist; so don’t expect me to know what I’m talking about regarding
the parameters in the methods of the interface above. Just view it as two
methods with some parameters.
The merger
can be realized as a thread with two message queues, as shown in the class
declaration below. Each queue has a corresponding message wrapper that can
contain the sent arguments.
class
MergerThread extends Thread implements Merger {
private MessageQueue heartQ
= new BoundedMessageQueueSkipOld(100);
private MessageQueue breathQ = new
BoundedMessageQueueSkipOld(100);
private class HeartMsg {
float amplitute;
HeartMsg(float a) {amplitude=a;}
}
private class BreathMsg {
float pressure, carbonDioxide;
BreathMsg(float p, float c) {pressure=p; carbonDioxide=c;}
}
public void
heartBeat(float amplitude) {
heartQ.put( new HeartMsg(amplitude) );
}
public void
breathing(float pressure, float carbonDioxide) {
breathQ.put( new BreathMsg(pressure,
carbonDioxide) );
}
public void run() {. . .}
}
So far, we
have seen how to receive messages from one queue at a time. Below I show you
one possibility of receiving messages from two queues.
while (. . .)
{
HeartMsg
h = (HeartMsg)heartQ.get();
BreathMsg b = (BreathMsg)breathQ.get();
. . .
}
The problem
with the code fragment above is that it requires that the heart rate is equal
to the breathing rate, an assumption that is just plain wrong. The code states
that first should we receive a heartbeat message and then after should we wait
for and receive a breathing message. Try holding your breath and counting the
number of heartbeats during that time interval. Then do it again and compare
with another person. The heartbeat rate and the breathing rate are two
independent rates and can’t be predicted accurately, we therefore must be able
to wait for either a heartbeat message or a breathing message.
This
problem formulation has been identified long time ago in the real-time
programming language design community. It is called non-deterministic message
acceptance, because it is non determined which message type that will be
accepted next.
The Ada
language is based on message passing between threads (called tasks) using
thread method invocation (called rendezvous). Each thread message (called entry
point) can be received (called accepted) in isolation in an ACCEPT statement or
together in a SELECT statement containing several ACCEPT statements. The SELECT
statement lists all message types that can be accepted by the thread at that
particular point in the thread’s execution path.
The SELECT
statement suspends the caller if no messages have been sent. The first message
received will resume the (server) thread and the execution continues from the
corresponding ACCEPT statement. If more than one message (type) have been sent,
the SELECT statement must choose one ACCEPT statement. The semantics of Ada
states that it must be an arbitrary message but chosen in a fair manner. This
means not the same ACCEPT can be chosen every time.
The most
straightforward implementation arranges the queues in a (logical) circle and
searches for a non-empty queue at most one turn. The next search starts from
the next queue in the circle.

The
solution to our merger problem is to implement a similar mechanism in Java as
the SELECT statement in Ada. We can’t change the syntax of Java, so we have to
stick to some object that realizes the requested semantics.
A
MessageQueueGroup assembles a set of MessageQueue objects and provides an Ada
SELECT similar semantics. In order to properly implement the group we need to
solve two sub-problems. The first is the easy one; management of a
java.util.ArrayList and an index pointing to the start position for a circular
search.
The other
problem is subtler, how to manage the synchronization among the different
queues and the group object. Think about it. A client thread inserts a message
into some queue, by calling its put method. This method notifies another party
by the method notifyAll. The server thread calls a method on the group object
to get a non-empty queue. If no queue can be found the server is suspended by
calling wait. The queue and the group are different object, which means they
denote different monitors as well. The figure below illustrates the dilemma.
The notifyAll call on the third queue’s lock, doesn’t resume the server,
because it’s sleeping at the group’s lock.

We can
solve this problem by ensuring that all objects agree on one single lock. Here
comes the synchronized block to rescue. In the solution below, all queues are
using the lock of the group object. This is set-up when a queue is added to a
group and reset when it’s removed again.

All message
queue classes inherit from AbstractMessageQueue, which implements both the put
and the get method using the template method design pattern. The queue logic is
delegated to some subclass but the synchronization logic is managed by the
class itself. This class is modified to use a specific object for
synchronization instead of its own lock. This lock object is initialised to the
value this, but can be changed in a setter method. All public methods are using
a synchronized block with the lock object as argument instead of being declared
as synchronized methods. The code fragment below shows the changes.
public abstract
class AbstractMessageQueue implements MessageQueue {
protected Object synchObj = this;
. . .
protected void setSynchObject(Object obj)
{
synchObj = obj;
}
public Object get() throws
InterruptedException {
synchronized (synchObj) {
Object msg = performGet();
synchObj.notifyAll();
return msg;
}
}
. . .
}
The
RendezvousMessageQueue class needs to be changed a little bit different,
because it’s using an inner queue. The setSynchObject method is overridden,
where it delegates the synchronization object to the inner queue.
public class
RendezvousMessageQueue extends AbstractMessageQueue {
private MessageQueue queue = new UnboundedMessageQueue();
. . .
protected void setSynchObject(Object obj)
{
( (AbstractMessageQueue)queue
).setSynchObject(obj);
}
. . .
}
I show you
the code for the MessageQueueGroup class below. You can find the source code
for this class and all other classes at the end of this article.
public class
MessageQueueGroup {
private List queueSet = new
ArrayList();
private int startSearchIdx = 0;
public MessageQueueGroup() {}
public synchronized void add(MessageQueue queue) {
if (queueSet.contains(queue)) return;
( (AbstractMessageQueue)queue
).setSynchObject(this);
queueSet.add(queue);
}
public synchronized void remove(MessageQueue queue) {
if (!queue.isEmpty()) {
throw new IllegalArgumentException("...");
}
if (queueSet.contains(queue)) {
queueSet.remove(queue);
( (AbstractMessageQueue)queue
).setSynchObject(queue);
}
}
public synchronized MessageQueue getNonEmptyQueue()
throws InterruptedException
{
while (true) {
MessageQueue queue =
findQueue(startSearchIdx);
startSearchIdx = next(startSearchIdx);
if (queue != null) return queue;
wait(); //Suspend the
receiving side (server)
}
}
protected MessageQueue findQueue(int
startIdx) {
if (queueSet.isEmpty())
return null;
int idx = startIdx, endIdx = idx;
do {
MessageQueue q = (MessageQueue)
queueSet.get(idx);
if (!q.isEmpty()) return q;
idx = next(idx);
} while (idx != endIdx);
return null;
}
protected int next(int idx) {
return (idx + 1) % queueSet.size();
}
}
Using our
new group object, we can return to the merger thread problem in the beginning
of this section. The constructor should create a MessageQueueGroup object and
add the two queues to it.
. . .
private
MessageQueue heartQ = new BoundedMessageQueueSkipOld(100);
private
MessageQueue breathQ = new
BoundedMessageQueueSkipOld(100);
private
MessageQueueGroup group = new MessageQueueGroup();
public
MergerThread() {
group.add(heartQ);
group.add(breathQ);
}
We can then
rewrite the code fragment of run method in the MergerThread to this
while (. . .)
{
MessageQueue q =
group.getNonEmptyQueue();
Object msg = q.get();
if (q == heartQ) {
HeartMsg
hmsg = (HeartMsg)msg;
. . .
} else {
BreathMsg bmsg ) (BreathMsg)msg;
. . .
}
. . .
}
The
dispatching of the message acceptance logic if performed by a simple
if-statement, but if there are many message type, it’s better to use a
switch-statement together with fixed integer values for each message type.
Let’s write
a small program that demonstrates the idea of non-deterministic message
acceptance in practice. I have deliberately changed the heartbeat and breathing
threads into two simple number counter threads instead, where the first sends
odd numbers and the second even numbers. The merger thread adds all received
numbers together.

Here are
the relevant parts of the program. I have made some simplifications for
clarity. You can find the complete source code at the end of this article.
interface
Merger {
void
putOddNumber(int n);
void putEvenNumber(int
n);
}
class
MergerThread extends Thread implements Merger {
private MessageQueue oddQ
= new BoundedMessageQueue(10);
private MessageQueue evenQ = new BoundedMessageQueue(10);
private MessageQueueGroup group = new MessageQueueGroup();
private int sum = 0;
public MergerThread() {
group.add(oddQ);
group.add(evenQ);
start();
}
public void putOddNumber(int n) {oddQ.put (new Integer(n));}
public void putEvenNumber(int n)
{evenQ.put(new Integer(n));}
public int getSum()
{return sum;}
public void run() {
while (true) {
MessageQueue q = group.getNonEmptyQueue();
int n = ( (Integer)q.get() ).intValue();
sum += n;
}
}
}
private
static class ClientThread extends Thread {
private Merger merger;
private boolean sendOddNumbers;
private int max;
public ClientThread(int max, boolean
sendOddNumbers, Merger merger) {
this.max = max;
this.merger = merger;
this.sendOddNumbers = sendOddNumbers;
start();
}
public void run() {
for (int k = (sendOddNumbers ? 1 : 2);
k <= max; k += 2) {
if (sendOddNumbers) {merger.putOddNumber(k); sleep(5);}
else {merger.putEvenNumber(k); sleep(15);}
}
}
}
Running the
program, were it counts from 1 to 10000, gives the output shown below
>java
NumberMerger -m 10000
max = 10000
--------------
Sum : 50005000
Expected:
50005000
We are
almost through, however there is one glitch left. Sometimes, you can’t wait for
all queues in a group. Follow me on to the next section were this problem will
be dissected.
The SELECT
statement in Ada and our MessageQueueGroup class in Java are terrific operators
in the context of message passing, because they allow a thread to wait for
different types of messages concurrently. However, in certain situations it’s
not appropriate to wait for every possible message type. The application state
of a thread may dictate that a subset of the available types should be enabled
instead. Let’s jump into an example to clarify what I mean.
The POTS
(Plain Ordinary Telephony Services) algorithm is used in telephone branch
exchange systems and it controls a single terminal (phone) and its interactions
with other terminals. It’s usually expressed as a state machine, with input
signals as off-hook, button(b), on-hook, etc and state
transitions based on the inputs. It’s out of scope for this article to describe
all of POTS, because it’s way to complex, but I can give you a slight flavour
of it.
A terminal
starts out in the idle state and can receive an off-hook signal,
when the user lifts the handset. The controller attaches the “ready tone”
generator to the terminal, so the user knows when to start dial. The controller
transitions to the waitForFirstDigit state, where it can receive either
a button(b) signal or an on-hook signal.
If the
controller receives an on-hook signal, because the user laid down the
handset again, it detaches the generator and transitions to the idle
state again. If the controller receives a button(b) signal instead, it
detaches the generator, sends the button id to a number analyser and
transitions to the waitForMoreDigits state.
In this
state it accepts both button(b) signals and on-hook signals. The
number analyser examines a digit sequence and determines if it’s incomplete,
complete, or invalid. Depending on the outcome the controller
takes different transitions.
If we want
to model the controller as a message-passing thread, it would be very
convenient if we could enable and disable different signals based on the
controller state.
This
problem formulation has been identified as well, in the real-time language
community. A guard can enable or disable an entry point (message type). You can
find the concept of guards in virtually all message-passing languages, like
Ada, Concurrent C, Erlang, Occam and others.
We can
introduce the idea in our message queue library, using an interface.
public
interface Guard {
public boolean isEnabled();
}
A guard
object can then be linked together with a queue whenever it’s added to a
message queue group. The search for a non-empty queue need also be modified to
check for if a guard is enabled or not. The code fragments below show the new
changes in the MessageQueueGroup class.
public
synchronized void add(MessageQueue
queue, Guard guard) {
GuardedQueue q = new
GuardedQueue(queue, guard);
if (queueSet.contains(q)) return;
( (AbstractMessageQueue)queue
).setSynchObject(this);
queueSet.add(q);
}
protected
MessageQueue findQueue(int startIdx) {
if (queueSet.isEmpty())
return null;
int idx = startIdx, endIdx = idx;
do {
GuardedQueue q = (GuardedQueue)
queueSet.get(idx);
if (q.isReady()) return
q.queue;
idx = next(idx);
} while (idx != endIdx);
return null;
}
protected
class GuardedQueue {
MessageQueue queue;
Guard guard;
public GuardedQueue(MessageQueue queue,
Guard guard) {
this.queue = queue;
this.guard = guard;
}
public boolean isReady() {
return (queue.isEmpty() == false)
&& (guard == null ||
guard.isEnabled());
}
public boolean equals(Object other) {. .
.}
}
At last, I
want to illustrate the usage of guards with a message queue group. Instead of
choosing a realistic example where the problem domain might hide the important
points in the context of this example, I prefer to use a toy example instead.
The idea is
a variant of the previous program, with two client threads sending a stream of
numbers, one with odd numbers, and the other with even numbers. As before,
there is a server thread responsible for accumulating the sum. The difference
is a new active mailbox thread in-between. The logic of the active mailbox
should be well known, if you have read the whole article so far. Therefore, we
can concentrate on the synchronization logic instead, which is far from
trivial.

The entry
points of the mailbox (called Buffer) is described by the interface below
interface
Buffer {
void
put(int x) throws InterruptedException;
int
get() throws InterruptedException;
void
quit() throws
InterruptedException;
}
The client
and server are trivial and I will only show you the main loop of each thread.
The main loop of the client looks like this
for (int k =
(sendOddNumbers ? 1 : 2); k <= max; k += 2) {
buffer.put(k);
}
Moreover,
the main loop of the server looks like this
int k;
while ((k =
buffer.get()) > 0) {
sum += k;
}
In both
cases are buffer a pointer to the active buffer thread. All three thread
methods of Buffer, are of rendezvous type. You can see that the get method
returns a value.
The buffer
thread is more complex compared to what we have seen so far in this article. It
contains three queues, one for each thread method and a queue group object
responsible for the non-deterministic selection of a non-empty queue. In
addition, it contains variables for the mailbox logic. Here is the first part
of the thread. As usual, you can find the complete source code at the end of
this article.
class
BufferThread extends Thread implements Buffer {
private static final int PUT = 0;
private static final int GET = 1;
private static final int QUIT = 2;
private RendezvousMessageQueue
putQ = new
RendezvousMessageQueue();
private
RendezvousMessageQueue getQ = new RendezvousMessageQueue();
private RendezvousMessageQueue
quitQ = new RendezvousMessageQueue();
private MessageQueueGroup
group = new MessageQueueGroup();
private Message msg = null;
private boolean full =
false;
public BufferThread() {
group.add(putQ, new IsEmptyGuard());
group.add(getQ, new IsFullGuard());
group.add(quitQ, new IsEmptyGuard());
start();
}
. . .
Three
guards are supplied when the three queues are added to the group. Here is one
the two guard classes.
class
IsEmptyGuard implements Guard {
public boolean isEnabled() {return !full;}
}
Next, I
show you the client-side of the three thread methods, where messages are
inserted into queue.
public void
put(int x) throws InterruptedException {
putQ.put(new Message(PUT, x));
}
public int
get() throws InterruptedException {
Message
reply = (Message) getQ.put(new Message(GET));
return reply.value;
}
public void
quit() throws InterruptedException {
quitQ.put(new Message(QUIT));
}
Every
message inserted, is wrapped by a Message object, containing a type id and the
message value. When a message is received, the correct acceptance logic is
chosen based on the type id. Here is finally the run method of the buffer
thread.
public void
run() {
boolean running = true;
while (running) {
RendezvousMessageQueue q =
(RendezvousMessageQueue)group.getNonEmptyQueue();
RendezvousMessageQueue.MessageWrapper w = q.getWrapper();
Message
m = (Message)w.getSenderMessage();
Message r = null;
switch (m.type) {
case PUT:
full = true;
msg = m;
break;
case GET:
full = false;
r = msg;
break;
case QUIT:
running = false;
break;
}
w.putReplyMessage(r);
}
}
The first
statement inside the loop, suspends the thread waiting for a new message. When
this happens, it resumes and the queue is returned. The queue is of rendezvous
type and contains a wrapper object, which is extracted in the second statement.
The wrapper contains the sender’s message and a single slot queue for the
sender thread to wait for the reply message in. The sender’s message is
extracted from the wrapper at the third line.
The switch
statement dispatches on the message type based on the type id in the sender’s
message. Each case entry manipulates the application state as you can see
above.
The last
thing that happens, before it starts over again, is that the sender gets a
reply. The reply message contains data only in case of the GET method. In the
other cases, it has the value null. The important thing to consider is the
synchronization between the sender and the receiver. The sender is suspended
until the receiver calls putReplyMessage.
Now, let’s
run the program to ensure that it works in practice. If you ever wanted to know
the sum of 1,2…100000 here it is.
>java
ActiveBuffer -m 100000
max = 100000
Buffer
Started
Server
Started
Client-ODD
Started
Client-EVEN
Started
Client-EVEN
Terminated
Client-ODD
Terminated
Server
Terminated
Buffer
Terminated
--------------
Sum : 5000050000
Expected:
5000050000
This
article started from the problem formulation of a simple communication link
between two threads. I have show you the minimum code needed to reliable
transfer data between threads and pointed out many pitfalls along the way.
After the
basic mechanism, a mailbox, was presented, I generalized the idea into a
message queue. The most important abstraction presented was the idea of Thread
Method Invocation (TMI) a concept inspired by the Ada programming language and
Remote Method Invocation in Java.
I rounded
up with a discussion of non-deterministic message acceptance and how it can be
implemented in Java.
There are
more aspects of this topic than I have discussed in this article, which might
be objective for another article. For example, how to implements active
objects, based of our strategy for non-deterministic message acceptance or
the danger of deadlocks in concurrent systems.
You can
find a link to a zip file containing all source code for this article. It
includes the message queue library and all demo programs, plus an Ant build
script. This script can compile all sources, run all unit tests, and generate
JavaDoc for the library.