Towards Ada Style Thread Communication in Java

or How to implement Thread Method Invocation

Jens Riboe

September 2002

 

Table of Contents

Introduction. 2

Basic Thread Communication. 2

Problem 1 – Why you need to use synchronized. 2

Analysis. 4

Solution. 4

Bug warning 1. 5

Bug warning 2. 6

Bug warning 3. 6

Problem 2 – Why you need to use wait and notify. 6

Analysis. 8

Solution. 8

About the concept monitor. 9

Problem 3 – Why you should use notifyAll instead of notify. 9

Analysis. 11

Solution. 12

The final mailbox. 13

Problem 4 – Why you should encapsulate a mailbox on the receiving side. 14

Finding prime numbers. 15

Buffered Thread Communication. 17

Problem 5 – Why you should use bounded queues. 17

Analysis. 18

Solution. 18

Problem 6 – Why you sometimes can’t suspend the sender, when the queue is full. 20

A small class library for thread message queues. 21

TMI – Thread Method Invocation. 24

Problem 7 – Why you should consider Thread Method Invocation (TMI) 24

Solution. 24

Non-Deterministic Message Acceptance. 28

Problem 8 – Why you sometimes must combine TMI with non-determinism.. 28

Analysis. 29

Solution. 30

Problem 9 – Why you might need guards when you are using non-determinism.. 35

Conclusions. 38

 


 

Introduction

The Java platform have introduced threads as a standard tool for solving day to day programming tasks. Before the Java era, thread programming was something for the specialists lurking around with POSIX or Win32 Threads in C or C++.

However, thread based programs may suffer from a complete new category of bugs essentially different from traditionally bugs. Many of these bugs originate from misconceptions of how threads communicate.

In this article, I will show you step by step how to build up a robust and reliable library for thread communication. The synopsis is problem driven, which means I will formulated and demonstrate a bug, then analyse it and finally present a solution.

The article starts with the very basics, how to send some data from one thread to another. It then introduces the concept rendezvous taken from the Ada programming language, and renames it into Thread Method Invocation (TMI). Finally, it discusses the idea of non-deterministic message acceptance, represented by the SELECT statement in Ada. All discussions leads into the definition of a small class library for thread communication, which provides both basic primitives and ones that are more complex.

Basic Thread Communication

This section discusses and motivates the minimum amount of code needed to transfer a data message from one thread to another. It ends up into how to proper use the keyword synchronized and the methods wait, notify and notifyAll. If you already are very familiar with these, you might want to skim this section and proceed to the next.

When two (or more) threads want to interchange data, they need to co-operate about a common place where one thread can put data and another can get it. This place is often referred to as a critical region, which loosely speaking means it’s critical to the correctness of the program execution. To understand the characteristics of such a region, let’s dive into a model problem that illustrates the problem and then how to solve it.

Problem 1 – Why you need to use synchronized

Consider a single bank account object containing an integer balance value. What happens if many actors update the account concurrently? Here is the class declaration for the Account class.

class Account {

    private int     balance = 0;

    public Account() {}

    public void updateBalance(int amount) {

        // balance += amount;

        int b = getBalance();  //READ

        b = b + amount;        //MODIFY

        setBalance(b);         //WRITE

    }

    public  int  getBalance()      {return balance;}

    private void setBalance(int b) {balance = b;}

}

As you can see above, I have decomposed the update statement into three statements to make it clear exactly what is going on. To perform an update account operation we need to read, modify and write data. These three operations are part of every kind of update operation, independent of context.

In addition to the Account class, we need an updater thread. A thread can loosely be viewed as a separate program, which means, it has its own program statement pointer and a function call stack. Threads consist of a little bit more than that, but we leave that out for now.

The clearest way to define a thread in Java is to extend the java.lang.Thread class. A thread object in Java behaves like an ordinary object, plus it can execute its run method concurrently with other threads. Let’s skip the details of threads and focus on the significant code in the run method of thread class Updater. Here it is

int amount = 100;

for (int i = 0; i < numTransactions; i++) {

    account.updateBalance(amount);

}

for (int i = 0; i < numTransactions; i++) {

    account.updateBalance(-amount);

}

As you can see above, an updater thread increments and decrements the balance of the account, but the net effect, after completion, is that nothing has changed, i.e. the balance is still zero. You can view the complete source code, if you scroll to the end of this article. At the end, you can also find a zip file containing all source code for this article, plus an Ant build script and needed jar files.

Now, let’s run the program and inspect the final balance. I have simplified the command line and the outputs for clarity. You can find detailed run instructions in the zip file.

>java AccountUpdater

NumUpdaters    : 5

NumTransactions: 10000

----------------

Final balance = 0

 

>java AccountUpdater -u 10 -t 100000

NumUpdaters    : 10

NumTransactions: 100000

----------------

Final balance = 0

It seems to be working perfect; end of story. Eh,… not quite. Let’s run it again but this time load it substantially more.

>java AccountUpdater -u 10 -t 10000000

NumUpdaters    : 10

NumTransactions: 10000000

----------------

Final balance = 1154011500

 

>java AccountUpdater -u 10 -t 10000000

NumUpdaters    : 10

NumTransactions: 10000000

----------------

Final balance = -1652878200

As you can see above, we clearly have a bug. When the number of transactions is high, the final balance has a random value. Why?

Analysis

To understand the nature of the problem we need to understand how threads are executing and to recap the read/modify/write operation. A thread executes as a small program and it proceeds uninterrupted until something forces it to stop. Besides, of the trivial reason that the program has ended, it also takes a break when it waits for a disc I/O completion, for example. However, that doesn’t explain our program, because we don’t have such operations in it.

Most modern operating systems and corresponding Java implementations are using so called pre-emptive scheduling, which means that each thread executes, at maximum, a specified amount of time. Then it is forced to take a break (suspend) to allow other threads execute. Eventually, the thread resumes its execution for another time slice. Either it hits the end of the slice or it performs a suspending operation, like a write operation to disc. In both cases, it is suspended again. The end of slice suspension is performed by a timer interrupt. An interrupt can occur anytime, for example inside a read/modify/write operation.

Let’s trace the execution of the updateBalance method. I have expanded the method body into the caller’s for loop, so you can see what’s going on.

for (int i = 0; i < numTransactions; i++) {

    // balance += amount;

    int b = getBalance();  //READ

    b = b + amount;        //MODIFY

    setBalance(b);         //WRITE

}

Pretend that we have only two updater threads. Thread 1 executes the loop, for example 10,000 times, until it reaches the end of its time slice and thread 2 starts executing the loop. The end of a slice might appear after the modify statement, but before the write statement. If that happens, the member variable balance has the value 1,000,000 but the local variable b has the value 1,000,100.

The execution now proceeds with thread 2, which increases the balance to, say, 2,000,000. When thread 1 resumes its execution, it overwrites the balance with the value in the local variable b. Be aware of that all local variables are local to a thread, i.e., we have one b for each thread. Instead of continuing from 2,000,000, thread 1 now continues from 1,000,100.

One single interrupt at the “wrong” place completely spoils the result of the program. The likelihood that an end of a time slice appears after read but before write is quite low, because there are more statements to execute than the three in the loop body. Therefore, we need to increase the number of transactions to more than a million before we can discover any problem.

The statement read/modify/write is traditionally called a critical (code) section, because it’s critical for the correctness of the program result. The result ends up incorrect, because of a timer interrupt, in-between read and write, but all interrupts outside the critical section are harm less.

Solution

Java has a proper solution to the problem. By declaring the updateBalance method as synchronized, we have solved the problem. This keyword guarantees that only one thread at a time can execute inside a synchronized method. If the section is occupied, all other threads must wait in a queue outside the section.

I have added a subclass to Account. Here it is

class SafeAccount extends Account {

    public synchronized void updateBalance(int amount) {

        super.updateBalance(amount);

    }

}

Running the demo once more with the latest argument and the new Account class, gives the output below.

>java AccountUpdater -u 10 -t 10000000 +s

NumUpdaters    : 10

NumTransactions: 10000000

UseSynchronized: true

----------------

Final balance = 0

Well, it’s not a proof, but you have to trust me – it is the solution to the problem. Run the program with a large number of transactions and threads, and convince yourself.

Bear in mind that the execution time will be substantially longer than before. The reason is that we now have many more thread context switches. Every time a thread completes an updateBalance operation and leaves the critical section for the next turn in the loop, the queue is non-empty, which causes the thread to wait in the queue instead and another thread to enter the section.

Instead of marking a method as synchronized, it’s possible to use a synchronized block instead. Such a block takes an object argument, which is then used for synchronization. Here is an equivalent to the synchronized method above, using a ditto block instead

public void updateBalance(int amount) {

    synchronized (this) {

        super.updateBalance(amount);

    }

}

You can use an arbitrary object as the synchronization object, for example

public void updateBalance(int amount) {

    synchronized (System.out) {

        super.updateBalance(amount);

    }

}

Sometimes you must use a synchronized block, but usually you should stick to a synchronized method instead, because it makes life a little bit simpler. Using a single global synchronization object, as System.out, means you have exactly one critical section in your program, which is seldom the case.

Bug warning 1

Using the keyword this as the argument to a synchronized block might be correct depending on the context. Can you see why the usage below is wrong?

class Data {public int  value=0;}

 

class Updater extends Thread {

    private Data  d;

    public Updater(Data d) {this.d = d;}

    public void    run() {

        while (true) {

            synchronized (this) {d.value += 100;}

        }

    }

}

Every updater thread is using its own synchronization object, instead of a common one. This is a kind of bug that easily slips through a code review and can be very difficult to spot later, when the application has been shipped to the customers. You can easily correct the error, by changing to

            synchronized (d) {d.value += 100;}

However, the best solution is to change the Data class to use synchronized methods instead.

Bug warning 2

Using synchronized methods is not a silver bullet. You must also know what is proper usage. For example, you have a class with synchronized public methods and add a new public method, but forget to declare it as synchronized. Now you have created a backdoor into the critical section.

This bug can be very hard to find, because a class, in general, has many methods where not necessarily all of them must be synchronized. Therefore, you can’t just say; make all methods synchronized. Although, you might want to say make all public methods synchronized.

Bug warning 3

I’m sorry to tell you that it doesn’t stop there either. Take a look at this class, which has all of its methods declared synchronized.

class Account {

    private int    balance=0;

    public synchronized void   setBalance(int b) {balance=b;}

    public synchronized int    getBalance()      {return balance;}

}

The class declaration looks irrefutable. However, what do you say about its usage?

void  performUpdate(Account a) {

    a.setBalance(a.getBalance() + 100);

}

The two methods are synchronized, but not the composite operation read/modify/write. In fact, this is a reincarnation of our original problem. Therefore, you must always analyse how a synchronized class should be used. The easy answer is to skip setters and stick to synchronized getters and updaters. An updater method takes an argument and performs the complete read/modify/write operation.

Now, you know a lot about the usage of synchronized. However, we have not solved our original objective; how to send data from one thread to another. I started this section with the observation that two threads that wants to communicate must agree on a common place where one can put data and the other can get data. The Account class is a typical common place and we can use it as the basis for our next class a mailbox.

Problem 2 – Why you need to use wait and notify

Consider two threads, Producer and Consumer, which want to communicate using a common place (mailbox). Let’s rename the last version of class Account and change the name of the member variable. Here is the new class, called a mailbox

class MailBox {

    private int    msg = 0;

    public synchronized void put(int msg) {this.msg=msg;}

    public synchronized int  get()        {return msg;}

}

The producer thread sends a stream of 1’s to the mailbox, by calling the method put and the consumer is receiving these 1’s by calling the method get on the mailbox object.

Below you can find the class declarations for both the producer and the consumer

class Producer extends Thread {

    private MailBox   out;

    private int       numMessages;

    public Producer(int numMessages, MailBox out) {

        this.out         = out;

        this.numMessages = numMessages;

    }

    public void run() {

        for (int i = 0; i < numMessages; i++) {

            out.put(+1);

        }

        out.put(-1);

        System.out.println("num = "+numMessages);

    }

}

 

class Consumer extends Thread {

    private MailBox    in;

    public Consumer(MailBox in) {

        this.in = in;

    }

    public void run() {

        int  sum=0, cnt=0, n;

        while ((n = in.get()) >= 0) {

            sum += n;

            cnt++;

        }

        System.out.println("sum = " + sum);

        System.out.println("cnt = " + cnt);

    }

}

If the producer sends 1000 1’s, I guess that you will expect the value of sum and cnt in the consumer to be 1000. Right? So, let’s run the program and check. I have modified the output for clarity. You can find a link to the source code in the end of this article, where you also can find a build script and run instructions.

>java NumberCounter

NumMessages   : 1000

----------------

Producer: num = 1000

Consumer: sum = 0

Consumer: cnt = 527361

 

>java NumberCounter -m 10000

NumMessages   : 10000

----------------

Producer: num = 10000

Consumer: sum = 0

Consumer: cnt = 0

 

>java NumberCounter -m 1000000

NumMessages   : 1000000

----------------

Producer: num = 1000000

Consumer: sum = 899622

Consumer: cnt = 1424308

Interesting, eh…? The sum of 1’s is always different from the number of 1’s sent and the number of turns in the consumer’s loop is always different from the sum of the 1’s. Strange, to say the least.

Analysis

Look at the program code once more. The msg member variable is initialised to 0. If the consumer starts executing, it will read the value 0 from the mailbox several times. The sum value remains zero, but the cnt value increases. Eventually, the consumer’s time slice ends and the producer starts executing. For a moderate number of messages, can the producer complete its for loop during one time slice and just before termination put the value –1 in the mailbox. The consumer will then resume, picking up the –1 and break its while loop.

If the producer starts executing, is it likely it can complete its for loop and leave a –1 in the mailbox, which then cause the consumer to break its while loop immediately.

If the number of messages is large, will the two threads switch a couple times, which results in a positive value of the sum variable.

Solution

Java has, of course, a solution to this problem. Every class inherits directly or indirectly from the class java.lang.Object. The Object class contains, among others, two methods of importance in this context. The wait method suspends the calling thread and the notify method resumes another thread that has already been suspended of a previous wait. These methods must be paced inside a synchronized block (method) else a IllegalMonitorStateException will be thrown. Nevertheless, how can this solve our problem?

The root cause of the incorrect result is that the two threads don’t wait for each other. When the producer has sent one message, it should wait until the consumer has received it. Moreover, the consumer must wait for the producer to send the next message. This is in general called, event synchronization. A thread might wait for an event (or condition) to occur and another thread might cause the event to occur (notify).

Let’s apply this insight and implement a subclass to Mailbox that has event synchronization. The events (or conditions) to wait for are the mailbox is full and it is empty respectively. The producer calls put and suspends it self if the mailbox is already full and the consumer suspends it self if the mailbox is empty when it calls method get. Here is the subclass

class SafeMailBox extends MailBox {

    private boolean isFull = false;

 

    public synchronized void put(int msg) {

        while (isFull)

            try{ wait(); }catch(InterruptedException x) {}

        super.put(msg);

        isFull = true;

        notify();

    }

 

    public synchronized int get() {

        while (!isFull)

            try{ wait(); }catch(InterruptedException x) {}

        int msg = super.get();

        isFull = false;

        notify();

        return msg;

    }

}

At the end of each method the method, notify is called to notify the other side that its waiting condition has changed. When a thread resumes after a wait, it rechecks the condition to be sure. This is a good habit and should always be applied instead of using a simpler if-statement. If we re-run the program now with the new mailbox implementation, it works as expected. Try it for your self! You can download the source code, if you scroll to the end of this article. Here is the output

>java NumberCounter -m 1000 +s

NumMessages   : 1000

UseWait&Notify: true

----------------

Producer: num = 1000

Consumer: sum = 1000

Consumer: cnt = 1000

The new mailbox contains the minimum amount of code needed for two threads to safely interchange messages. There are variations on the theme, as we will see later in this article, but no short cuts. Every attempt to leave out wait/notify will cause a bug.

About the concept monitor

An object using synchronized and wait/notify is usually referred to as a monitor, in concurrency theory. A monitor consists of (shared) data and a set of access methods. The only way to access the data is to call one of the access methods. This guarantees that only one thread at a time can be executing inside a monitor. To better, understand the semantics of a monitor look at the illustration below.

A monitor (synchronized object) is using two queues and a lock to realize its semantics. When a thread calls one of the access methods, the lock is inspected. If it is unlocked, the thread becomes the lock owner and the monitor is now locked until the thread leaves the monitor. If the monitor is already locked, is the thread suspended and inserted into the first queue (enterQ).

A thread executing inside a monitor might want to check a synchronization event (condition) and if needed suspend it self until the event occur. This is performed by the wait method in Java and the thread is inserted into the second queue (waitQ). The monitor is also unlocked, to allow another thread in enterQ to acquire the monitor.

Eventually, a thread inside the monitor calls the method notify, which moves one thread from waitQ to the front of enterQ. Whenever the notifying thread leaves the monitor and it becomes suspended, because of an end of time slice interrupt or some kind of suspending operation, the awoken thread resumes its execution after the wait statement with the monitor re-acquired. The explanation above contains some simplifications, but serves as a good model for monitors.

Java implements a so called resume&continue monitor, that means the notifying thread wakes up a thread from waitQ, but continues executing inside the monitor. The alternative is a resume&wait monitor, were the notifying thread becomes suspended and the awoken thread starts executing inside the monitor. A third alternative is a so-called implicit resume monitor. This variant lacks wait and notify primitives. Instead, it has entrance conditions (called guards) and re-checks them every time a thread leaves the monitor. This variant is an important concurrency building block in the real-time programming language Ada95.

Now, you know how to set up a communication link between two threads, but not between more than two! Follow on to the next problem to understand why.

Problem 3 – Why you should use notifyAll instead of notify

Let’s modify our latest program a little bit, so we allow more than one producer and let each of them send the sequence 1,2…n. The (single) consumer adds all numbers and prints out the result when the program terminates. If we have p number of threads, should the result be p * n * (n+1) / 2.

I’m using the same mailbox as before, slightly modified. Here it is.

class MailBox {

    protected int       data   = 0;

    protected boolean   isFull = false;

    public synchronized void put(int msg) {

        while (isFull)

            try{ wait(); }catch(InterruptedException x) {}

        data   = msg;

        isFull = true;

        doNotify();

    }

    public synchronized int get() {

        while (!isFull)

            try{ wait(); }catch(InterruptedException x) {}

        int msg = data;

        isFull  = false;

        doNotify();

        return msg;

    }

    protected void doNotify() {notify();}

}

Here is the producer and consumer thread classes. I have simplified the code for clarity. You can find a link to the complete source code at the end of this article.

class Producer extends Thread {

    private MailBox out;

    private int numMessages;

    public Producer(int numMessages, MailBox out) {

        this.out         = out;

        this.numMessages = numMessages;

    }

    public void run() {

        for (int i = 1; i <= numMessages; i++) {

            out.put(i);

        }

    }

}

 

class Consumer extends Thread {

    private MailBox     in;

    private int         sum = 0;

    public Consumer(MailBox in) {

        this.in = in;

    }

    public void run() {

        int n;

        while ((n = in.get()) >= 0) {

            sum += n;

        }

        System.out.println("sum = " + sum);

    }

}

The main program is responsible for creating a mailbox, the consumer, and a number of producers. It then waits for the completion of all producers, sends a –1 via the mailbox to the consumer and waits for the completion of the consumer. Now, let’s run the program with two producers and see what happens.

1)    >java NumberSequencer +v -p 2

2)    NumProducers: 2

3)    Max         : 10

4)    UseNotifyAll: false

5)    Consumer: Started

6)    Consumer: Mailbox<data=0, isFull=false> get(): enter

7)    Consumer: Mailbox<data=0, isFull=false> get(): before wait

8)    Producer-1: Started

9)    Producer-1: OUT 1

10)Producer-1: Mailbox<data=0, isFull=false> put(msg=1) enter

11)Producer-1: Mailbox<data=1, isFull=true> put(msg=1) before notify

12)Producer-1: Mailbox<data=1, isFull=true> put(msg=1) exit

13)Producer-1: OUT 2

14)Producer-1: Mailbox<data=1, isFull=true> put(msg=2) enter

15)Producer-2: Started

16)Producer-2: OUT 1

17)Producer-1: Mailbox<data=1, isFull=true> put(msg=2) before wait

18)Consumer: Mailbox<data=1, isFull=true> get(): after wait

19)Consumer: Mailbox<data=1, isFull=false> get(): before notify

20)Consumer: Mailbox<data=1, isFull=false> get(): exit

21)Producer-2: Mailbox<data=1, isFull=false> put(msg=1) enter

22)Producer-2: Mailbox<data=1, isFull=true> put(msg=1) before notify

23)Producer-2: Mailbox<data=1, isFull=true> put(msg=1) exit

24)Producer-2: OUT 2

25)Producer-1: Mailbox<data=1, isFull=true> put(msg=2) after wait

26)Producer-1: Mailbox<data=1, isFull=true> put(msg=2) before wait

27)Producer-2: Mailbox<data=1, isFull=true> put(msg=2) enter

28)Consumer: IN 1

29)Producer-2: Mailbox<data=1, isFull=true> put(msg=2) before wait

30)Consumer: Mailbox<data=1, isFull=true> get(): enter

31)Consumer: Mailbox<data=1, isFull=false> get(): before notify

32)Consumer: Mailbox<data=1, isFull=false> get(): exit

33)Consumer: IN 1

34)Consumer: Mailbox<data=1, isFull=false> get(): enter

35)Consumer: Mailbox<data=1, isFull=false> get(): before wait

36)Producer-1: Mailbox<data=1, isFull=false> put(msg=2) after wait

37)Producer-1: Mailbox<data=2, isFull=true> put(msg=2) before notify

38)Producer-1: Mailbox<data=2, isFull=true> put(msg=2) exit

39)Producer-1: OUT 3

40)Producer-2: Mailbox<data=2, isFull=true> put(msg=2) after wait

41)Producer-2: Mailbox<data=2, isFull=true> put(msg=2) before wait

42)Producer-1: Mailbox<data=2, isFull=true> put(msg=3) enter

43)Producer-1: Mailbox<data=2, isFull=true> put(msg=3) before wait

If you try it yourself, you will see a similar sequence as above and then nothing. The program seems to always hang. What has happened?

Analysis

Recap our model of a monitor previously. Let’s trace the execution from the system state that both producers are waiting in waitQ and the consumer enters the monitor (line 30 in the listing above and picture 1 below).

   

The consumer proceeds out of the monitor, receives the value 1 and notifies the first thread in waitQ which is producer 1. The consumer manages to re-enter the monitor, but finds it empty and calls wait (line 35 in the listing above). This allows producer 1 to resume its execution after wait (line 36 and figure 2).

Producer 1 inserts value 2 into the mailbox and notifies the first thread (P2) in waitQ (line 37). It then leaves the monitor (line 38). This allows producer 2 wake up inside the monitor, after wait (line 40 and figure 3). However, it discovers that the mailbox is full and goes to sleep again (line 41). Producer 1 enters the mailbox again, because it wants to send the value 3. Unfortunately, the mailbox is still full and producer 1 goes to sleep too (line 43 and figure 4). This system state now remains forever…

    

The problem in this scenario is that if the wrong thread is resumed from waitQ it will go to sleep again, without giving another thread a chance to continue, for example a consumer.

Solution

Luckily, Java has a solution to this problem too. We can use the method notifyAll instead of notify. This method resumes all threads in waitQ and allows them to enter the monitor one at a time. Let’s make a subclass to mailbox, which fixes this problem. Here it is

class SafeMailBox extends MailBox {

    protected void doNotify() {notifyAll();}

}

Running the program once again, now with a mailbox that notifies all threads in waitQ, gives the expected result.

>java NumberSequencer -v -p 2 +s

NumProducers: 2

Max         : 10

UseNotifyAll: true

Consumer: Started

Producer-1: Started

Producer-1: Terminated

Producer-2: Started

Producer-2: Terminated

Consumer: Terminated

----------------

Consumer: sum = 110

Expected  sum = 110

 

>java NumberSequencer -v -p 20 +s

NumProducers: 20

Max         : 10

UseNotifyAll: true

----------------

Consumer: sum = 1100

Expected  sum = 1100

 

>java NumberSequencer -v -p 100 +s -m

NumProducers: 100

Max         : 100

UseNotifyAll: true

----------------

Consumer: sum = 505000

Expected  sum = 505000

The final mailbox

Finally, at last, we have a robust component for inter-thread communication. I show you the result of our mailbox below. I took the freedom to change the message type to Object, making it a little more versatile.

class MailBox {

    protected Object    data   = null;

    protected boolean   isFull = false;

 

    public synchronized void put(Object msg) {

        while (isFull == true)

            try{ wait(); }catch(InterruptedException x) {}

 

        data   = msg;

        isFull = true;

        notifyAll();

    }

 

    public synchronized Object get() {

        while (isFull == false)

            try{ wait(); }catch(InterruptedException x) {}

 

        Object msg = data;

        isFull     = false;

        notifyAll();

        return msg;

    }

}

Problem 4 – Why you should encapsulate a mailbox on the receiving side

Our fourth problem is not a problem in the same sense as the previous three. It’s more a matter of convenience than incorrectness. However, going for a complicated solution, instead of a simpler one, might result in a bug.

Let’s study a slightly more complicated situation than before. Instead of a single communication pair, we have long pipeline of communicating threads. A producer sends a message to a transformer, which does something and forwards it to another transformer, and eventually the pipeline ends with a consumer that receives the result. Setting up this pipeline requires management of both threads of different flavours and mailboxes in-between every thread pair. It’s cumbersome, to say the least, but also inflexible and tedious.

The solution is to move the mailbox into the receiving side, as a member variable, and supply a public method that takes care of the details. Here is an example illustrating the general idea.

class Consumer extends Thread {

    private Mailbox   inbox = new Mailbox();

 

    public void    send(int msg) {inbox.put( new Integer(msg) );}

    private int    receive() {return ((Integer)inbox.get()).intValue();}

 

    public void  run() {

        int n;

        while ((n = receive()) >= 0) {...}

    }

}

class Producer extends Thread {

    private Consumer    next;

 

    public Producer(Consumer c) {next=c;}

 

    public void  run() {

        for (...) { next.send(k); }

    }

}

The advantages of this arrangement are 1) the receiving side has encapsulated the synchronization logic, including packing and unpacking of envelop objects and 2) the sending side calls the other thread with a commonly known method invocation semantics. If we later want to change the mailbox type into another kind of object (see the next sections in this article), it’s completely transparent for the sending side.

The send method of the consumer acts as a rendezvous point between a communicating thread pair. It is important to observe that the method is executed in the context of the sending side, i.e. not in the consumer thread’s context.

The pipeline problem mentioned above requires two kinds of receivable threads; a consumer and a transformer. This calls for an abstraction manifested by an interface and an abstract implementing class.

interface Receivable {

    void  send(int msg);

}

 

abstract class AbstractReceivable

       extends Thread

    implements Receivable

{

    private Mailbox inbox = new Mailbox();

    public void     send(int msg) {inbox.put(new Integer(msg));}

    protected int   receive() {return ((Integer)inbox.get()).intValue();}

}

By extending the abstract class, we can define the two receivables. I show all three thread classes below, plus a method that assembles a pipeline.

class Consumer extends AbstractReceivable {

    public void    run() {

        int msg;

        while ((msg = receive()) >= 0) {

            System.out.println(“Consumer: ”+msg);

        }

    }

}

 

class Transformer extends AbstractReceivable {

    private Receivable    next;

    public Transformer(Receivable n) {next=n;}

    public void    run() {

        int msg;

        while ((msg = receive()) >= 0) {

            next.send(2 * msg);

        }

        next.send(-1);

    }

}

 

class Producer extends Thread {

    private Receivable    next;

    private int           N;

    public Producer(int N, Receivable r) {this.N=N; next=r;}

    public void    run() {

        for (int i=1; i<=N; ++i) {

            next.send(i);

        }

        next.send(-1);

    }

}

 

class DoublerPipeline {

    public void    start(int numTransformers, int N) {

        Receivable    r = new Consumer();

        for (int i=1; i<=numTransformers; ++i) {

            r = new Transformer(r);

        }

        Producer  p = new Producer(N, r);

    }

}

Here is a printout from running the program, with 10 transformers (doublers) and the producer sending the values 1,2…n.

>java DoublerPipeline -d 10 -m 100

NumDoublers: 10

Max        : 100

----------------

Consumer: Last received = 102400

Expected  last received = 102400

Finding prime numbers

The benefits of moving the synchronization logic into the receiving side should not be underestimated. Thread communication becomes as simple as object communication. Below I show you a program that lists all prime numbers between 1 and N. The algorithm is very inefficient if you are truly interested in finding prime numbers, however it illustrates the threading concept elegantly. It’s a good benchmark too for the threading performance of a language or thread library and I have implemented it in many different languages and libraries over the years.

The program idea builds upon the classical algorithm using a chain of prime number sieves. Every sieve is a thread, and the program builds up a chain that gets longer for every prime number found. The main program sends the number sequence 3,4…n to the first prime number sieve representing number 2.

A sieve thread receives a number and checks if the number is divisible with the prime number of the sieve. The number is sent to the next sieve in the chain if it is not divisible. A new sieve thread is created the first time this happens. The first thing a new sieve does is it prints out the prime number it represents.

public class PrimeNumbers {

    public static void main(String[] args) {. . . }

 

    public void    start(int max) {

        PrimeNumberSieve    next = new PrimeNumberSieve(1, 2);

        for (int i=3; i<=max; ++i) {

            next.put(i);

        }

        next.put(-1);

    }

 

    private static class PrimeNumberSieve extends Thread {

        private MailBox             inbox = new MailBox();

        private PrimeNumberSieve    next  = null;

        private int                 order;

        private int                 prime;

 

        public PrimeNumberSieve(int n, int prime) {

            this.order = n;

            this.prime = prime;

            start();

        }

        public void run() {

            System.out.println(order + ": " + prime);

            int  n;

            while ((n = get()) > 0) {

                if (n % prime != 0) {

                    if (next == null)

                        next = new PrimeNumberSieve(order+1, n);

                    else

                        next.put(n);

                }

            }

            if (next != null) next.put(-1);

        }

        public void    put(int n) {inbox.put(n);}

        private int    get() {return inbox.get();}

}

Here is a printout of one execution of the program, listing all prime numbers below 1000.

>java PrimeNumbers -m 1000

1: 2

2: 3

3: 5

4: 7

5: 11

6: 13

. . .

165: 977

166: 983

167: 991

168: 997

In the listing above, you can see the program have created 168 threads. That means the last numbers travelled through 168 thread context switches to the end of the chain. Moreover, every thread must wait for the next thread to receive a sent number before it can send the next number. In this hobby program, it doesn’t matter, but for a real program you might have a chain of different threads with different processing time. That means in clear text, the slowest thread in the chain slows down the whole chain’s processing time.

If the different times are fixed, it’s no problem, because the chain can’t operate faster. On the other hand, if the times vary it’s inefficient to always wait for the slowest thread in the chain. This problem adds a new requirement on our mailbox implementation. Hang on to the next section where this problem will be addressed.

Buffered Thread Communication

A drawback with a single slot buffer, a.k.a. mailbox, is that the data transfer speed is limited to the processing time of an intermediary thread, because only one data message at time can be sent between a pair of threads. A solution that adds more flexibility is a multi slot buffer, which can even out differences in processing time between different threads and over time.

This section discusses how to proper implement such a buffer or, with a better term, a message queue.

Problem 5 – Why you should use bounded queues

The most obvious representation of a message queue is a linked list (java.util.LinkedList). Below I show you a straightforward implementation. The most important observation is that the queue lacks a call of wait in the put method, because it’s not needed.

class UnboundedQueue {

    private LinkedList      theQueue = new LinkedList();

 

    public synchronized void   put(Object msg) {

        theQueue.addLast(msg);

        notifyAll();

    }

 

    public synchronized Object    get() {

        while (theQueue.isEmpty())

            try { wait(); } catch (InterruptedException x) {}

        Object msg = theQueue.removeFirst();

        return msg;

    }

}

The queue is a member of a consumer thread, which receives block of bytes representing data messages. A producer creates such blocks and just sends them to the consumer. I don’t bother to show you the rest of the program; you should be quite familiar with producers and consumers, so far reading this article. You can find the complete source code at the end of this article.

Let’s run the program and study the output. The producer sends 10000 blocks of size 10000 bytes each, to the consumer. A special watcher thread prints some measures once a second.

>java TimedPipeline -s 10000

Max          = 10000 message

BlockSize    = 10000 bytes

1) Consumer queue size = 163, Memory alloc/total/max = 1/3/128 Mb

2) Consumer queue size = 328, Memory alloc/total/max = 3/5/128 Mb

3) Consumer queue size = 501, Memory alloc/total/max = 5/5/128 Mb

4) Consumer queue size = 662, Memory alloc/total/max = 6/9/128 Mb

5) Consumer queue size = 836, Memory alloc/total/max = 8/9/128 Mb

. . .

35) Consumer queue size = 5975, Memory alloc/total/max = 61/63/128 Mb

36) Consumer queue size = 6070, Memory alloc/total/max = 58/63/128 Mb

37) Consumer queue size = 6246, Memory alloc/total/max = 60/63/128 Mb

38) Consumer queue size = 6426, Memory alloc/total/max = 62/63/128 Mb

Got java.lang.OutOfMemoryError

Consumer queue size = 6565, Memory alloc/total/max = 63/63/128 Mb

As you can see above, the program has crashed after 38 seconds, because the memory is exhausted. Why?

Analysis

Well, the data block size is perhaps unrealistic large, and the JVM settings could have been adjusted to allow a larger heap. In addition, if you read the source code you will also find a sleep statement in the consumer, which means the consumer sleep 50 ms after it has consumed a data block. That means the producer and the consumer doesn’t work with the same processing speed. Consequently grows the message queue larger and larger until the memory is exhausted.

If we adjust the message size to something smaller and increase the JVM heap, the program will run for a longer time. Perhaps several hours or days, but eventually it will crash due to the same reason as above. On my machine, it took 47 minutes until crash, with a message size of 100 bytes.

The root cause is the difference in processing time between the producer and the consumer. In this demo program it’s a fixed sleep time, but in a real application the processing time has a natural variation and is a consequence of different tasks performed.

If the application, using an unbounded buffer, is a 24/7 application, it can be quite unpleasant if it crashes once a week. What we have is a resource (memory) leak, similar to what is common in C/C++ programs.

Unbounded message queues as the basic primitive for thread communication can be a burden for a programming language. The real-time programming language Erlang, developed by Ericsson in the beginning of the 90’s, suffered from that problem. Erlang have many similarities with Java, except the syntax, which is derived from Prolog. As a response to many reports on memory leaks, they eventually developed a platform on top of the language, with bounded communication as one of its primary objectives.

Solution

We can easily solve the problem, by re-introducing a call to wait in the put method. The new kind of buffer is a bounded message queue. A producer that calls put will be suspended if the queue is full. Our previous mailbox is a single slot queue.

There are several ways to realize a bounded queue. The simplest is to use the same implementation as above and add logic to compare the queue size against a predefined max size and call wait if needed.

public synchronized void   put(Object msg) {

    while (theQueue.size() >= maxSize)

        try { wait(); } catch (InterruptedException x) {}           

    theQueue.addLast(msg);

    notifyAll();

}

The drawback might be a slight performance reduction; compared to the alternative implementation I will show you below. In addition, this alternative opens up some extensions I will discuss later in this article.

class BoundedQueue {

    private Object[]        theQueue;

    private int             putIdx = 0, getIdx = 0, size = 0;

 

    public BoundedQueue(int nSlots) {

        theQueue = new Object[nSlots];

    }

 

    public synchronized void put(Object msg) {

        while (isFull())

            try{ wait(); }catch(InterruptedException e) {}

        insert(msg);       

        notifyAll();

    }

    public synchronized Object get() {

        while (isEmpty())

            try{ wait(); }catch(InterruptedException e) {}

        Object msg = remove();

        notifyAll();

        return msg;

    }

 

    private boolean isFull()  {return size >= theQueue.length;}

    private boolean isEmpty() {return size == 0;}

 

    private void    insert(Object msg) {

        theQueue[putIdx++] = msg;

        putIdx = putIdx % theQueue.length;

        size++;

    }

    private Object  remove() {

        Object msg = theQueue[getIdx++];

        getIdx = getIdx % theQueue.length;

        size--;

        return msg;

    }

}

Running the program again, now with a bounded buffer instead gives this output.

>java TimedPipeline -s 10000 +b

Max          = 10000 message

BlockSize    = 10000 bytes

QueueSize    = 32 slots

BoundedQueue = true

1) Consumer queue size = 32, Memory alloc/total/max = 0/1/128 Mb

2) Consumer queue size = 32, Memory alloc/total/max = 0/1/128 Mb

3) Consumer queue size = 32, Memory alloc/total/max = 0/1/128 Mb

4) Consumer queue size = 32, Memory alloc/total/max = 0/1/128 Mb

5) Consumer queue size = 32, Memory alloc/total/max = 0/1/128 Mb

. . .

506) Consumer queue size = 32, Memory alloc/total/max = 1/1/128 Mb

507) Consumer queue size = 32, Memory alloc/total/max = 0/1/128 Mb

508) Consumer queue size = 32, Memory alloc/total/max = 0/1/128 Mb

509) Consumer queue size = 32, Memory alloc/total/max = 0/1/128 Mb

Producer terminated

510) Consumer queue size = 21, Memory alloc/total/max = 0/1/128 Mb

511) Consumer queue size = 1, Memory alloc/total/max = 0/1/128 Mb

Consumer terminated

This problem illustrates clearly, that you should go for a bounded message queue, whenever you want two or more threads to communicate. The reason is that if you have an unbounded flow of messages you must regulate that flow, so it don’t exhaust the number of available queue slots. On the other hand, if you do have a bounded flow of messages. That means, if there is a well-defined upper bound of the number of outstanding messages, you can use an unbounded queue instead.

Using a bounded message queue solves most of your thread communication demands, except for some real-time system situations I will discuss next. Follow me on to the next section.

Problem 6 – Why you sometimes can’t suspend the sender, when the queue is full

Pretend that your application is a life-sustaining system, monitoring heartbeats and reacting to changes in the rhythm. There might be a producer reading a medical device and sending data messages to a consumer that interprets the information and reacts to it. If the message queue becomes full and the producer suspended, important information might get lost. In a critical moment, the last seconds of heartbeats are more important than those before.

Another, less extreme, example is when the producer is not a thread, but an interrupt routine. In Java, you can use the Java Communications API to read and write over serial and parallel connections, or you can handwrite something else in C/C++ using Java Native Interface (JNI). An interrupt routine in C might invoke a callback method in Java, using JNI. This callback method can then insert a message into a queue. The problem here is that you can’t easily suspend the interrupt routine or callback method, without resorting to disable the interrupt, which might be cumbersome or inappropriate.

The two problem statements above, calls for a solution were it’s possible to throw away old messages in the queue in favour of new ones, instead of suspending the producer side.

However, what happens if we really want old messages in the queue and it’s possible to re-send new messages, due to some application level timeout mechanism? Well, this calls for a variant solution, were new messages are rejected instead.

The solution is three different implementations of the put method, realised in a base class (bounded queue) and two sub-classes (skip new and skip old messages respectively). I show you the three put methods below and direct you to the source code if you want to read all of it.

public synchronized void    put(Object msg) {

    performPut(msg);

    notifyAll();

}

 

//Suspend sender – class BoundedQueue

protected void performPut(Object msg) throws InterruptedException {

    while (isFull())

        wait();

    insert(msg);

    size++;

}

 

//Skip old messages – class BoundedQueueSkipOld

protected void performPut(Object msg) throws InterruptedException {

    if (isFull()) {

        getIdx = next(getIdx); //move the get index (overwrite)

    } else {

        size++;

    }

    insert(msg);

}

 

//Skip new messages – class BoundedQueueSkipNew

protected void performPut(Object msg) throws InterruptedException {

    if (isFull()) return; //skip msg if it’s full

    insert(msg);

    size++;

}

 

//Helper methods

public synchronized    isFull() {

    return size >= queue.length;

}

protected void insert(Object msg) {

    queue[putIdx] = msg;

    putIdx = next(putIdx);

}

protected int next(int idx) {

    return (idx + 1) % queue.length;

}

A small class library for thread message queues

The discussion in this section naturally leads into a class hierarchy representing the different flavours of message queues. You can find a class diagram below, which shows the structure of a small class library for thread message queues. The basic abstraction is represented by an interface (MessageQueue) with an abstract sub-class realizing the interface and implementing common functionality and template methods. The abstract class is then extended into two classes, one realizing an unbounded queue and the other a bounded queue. The bounded queue is further specialized into two classes that differ in how they treat messages when the queue is full; either skip new messages or skip old messages. You can find the Java code of the library below, after the diagram.

 

public interface MessageQueue {

    public void      put(Object msg) throws InterruptedException;

    public Object    get() throws InterruptedException;

    public boolean   isEmpty();

    public boolean   isFull();

    public int       getSize();

}

 

public abstract class AbstractMessageQueue implements MessageQueue {

    public synchronized void    put(Object msg) throws InterruptedException {

        performPut(msg);

        notifyAll();

    }

    public synchronized Object  get() throws InterruptedException {

        Object msg = performGet();

        notifyAll();

        return msg;

    }

 

    public synchronized int     getSize() {return performSize();}

    public synchronized boolean isEmpty() {return (getSize() == 0);}

    public synchronized boolean isFull()  {return false;}

 

    protected abstract void   performPut(Object msg) throws InterruptedException;

    protected abstract Object performGet()           throws InterruptedException;

    protected abstract int    performSize();

}

 

public class UnboundedMessageQueue extends AbstractMessageQueue {

    private LinkedList queue = new LinkedList();

 

    public UnboundedMessageQueue() {}

 

    protected void   performPut(Object msg) throws InterruptedException {

        queue.addLast(msg);

    }

    protected Object performGet() throws InterruptedException {

        while (queue.isEmpty()) wait();

        return queue.removeFirst();

    }

    protected int performSize() {return theQueue.size();}

}

 

public class BoundedMessageQueue extends AbstractMessageQueue {

    protected Object[] queue;

    protected int      putIdx = 0, getIdx = 0, size = 0;

 

    public BoundedMessageQueue()      {this(32); }

    public BoundedMessageQueue(int n) {queue = new Object[n];}

 

    protected void performPut(Object msg) throws InterruptedException {

        while (isFull()) wait();

        insert(msg);

        size++;

    }

    protected Object performGet() throws InterruptedException {

        while (isEmpty()) wait();

        Object msg = remove();

        size--;

        return msg;

    }

    protected void insert(Object msg) {

        queue[putIdx] = msg;

        putIdx = next(putIdx);

    }

    protected Object remove() {

        Object msg = queue[getIdx];

        getIdx = next(getIdx);

        return msg;

    }

    public synchronized boolean isFull() {return size >= queue.length;}

    protected int performSize()          {return size; }

    protected int next(int idx)          {return (idx+1) % queue.length;}

}

 

public class BoundedMessageQueueSkipOld extends BoundedMessageQueue {

    public BoundedMessageQueueOverwriteOld()      {}

    public BoundedMessageQueueOverwriteOld(int n) {super(n);}

 

    protected void performPut(Object msg) throws InterruptedException {

        if (isFull()) {

            getIdx = next(getIdx);

        } else {

            size++;

        }

        insert(msg);

    }

}

 

public class BoundedMessageQueueSkipNew extends BoundedMessageQueue {

    public BoundedMessageQueueRejectNew()      {}

    public BoundedMessageQueueRejectNew(int n) {super(n);}

 

    protected void performPut(Object msg) throws InterruptedException {

        if (isFull()) return;

        insert(msg);

        size++;

    }

}

TMI – Thread Method Invocation

This section introduces the concept Thread Method Invocation (TMI), which is something in-between local method invocation (LMI) and remote method invocation (RMI).

LMI is the normal semantics when a method of an object is called. RMI on the other hand, means a method of an object in another address space is called. LMI is performed inside the context (function call stack) of one thread, but TMI is performed in two different contexts.

Problem 7 – Why you should consider Thread Method Invocation (TMI)

All thread programs so far in this article have been using a push model of thread communication. That means the calling thread is sending something to the thread being called. However, how can we go the other way around? That means the caller receives data back instead. Moreover, how can the calling thread both send and receive data?

This semantics is well known if we are speaking about local method invocation, but also remote method invocation (RMI).

Pretend that we need a computation thread responsible for computing the Fibonacci number. A Fibonacci number is defined as

It would be very convenient if we can invoke the thread using the well-known local method invocation semantics, instead of hassle with double mailboxes or queues. Here is a code fragment showing what I mean

FibonacciServer  fib    = new FibonacciServer();

fib.start();

int              result = fib.compute(10);

The body of the compute method is executed in the context of the server and not in the context of the calling thread. This very simplified example shows how data is sent to and received from a thread.

TMI is the basic mechanism for thread communication in the programming language Ada. In Ada is a thread called a task and TMI is called rendezvous.

It’s not obvious how to realize TMI, because we can’t use a message queue in the same way we have studied so far, because the data transfer is one-way only. The reply message can’t be stored in a member variable of the queue, because there might be several threads calling the server, each one suspended and waiting in the queue. We must be able to resume a specific sender thread whenever a reply message has been computed based on data taken from the sender’s message.

Solution

The solution is to introduce a wrapper message that contains both a sender’s message and a single slot queue, were the sender thread can wait for the reply message. Have a look at the figure below.

The calling thread (client) creates a wrapper and stores the message in it and calls then waitForReplyMessage, which suspends it. Eventually, the server pulls out the wrapper from the server’s message queue and extracts the sender’s message by calling getSenderMessage. When the server has finished its computation, it calls putReplyMessage, which inserts a reply message in the reply queue of the wrapper. This resumes the client and it receives the reply message and continues its execution. The message queue of the server contains wrapper messages, one for each waiting sender thread.

Let me start with the implementation first and then after that show the code of the fibonacci server example. Here is first the class RendezvousMessageQueue containing the inner class MessageWrapper.

class RendezvousMessageQueue implements MessageQueue {

    private MessageQueue queue = new UnboundedMessageQueue();

 

    public RendezvousMessageQueue() {}

 

    public Object    put(Object msg) throws InterruptedException {

        MessageWrapper   wrapper = new MessageWrapper(msg);

        queue.put(wrapper);

        return wrapper.waitForReplyMessage();

    }

 

    public Object    get() throws InterruptedException {

        return queue.get();

    }

 

    public boolean isEmpty() {return queue.isEmpty();}

    public boolean isFull()  {return false; }

    public int     getSize() {return queue.getSize();}

 

    public class MessageWrapper {

        private Object         msg;

        private MessageQueue   replyQ = new BoundedMessageQueue(1);

 

        protected MessageWrapper(Object msg) {this.msg = msg; }

 

        public Object getSenderMessage()     {return msg; }

 

        public void putReplyMessage(Object msg) throws InterruptedException {

            replyQ.put(msg);

        }

        public Object waitForReplyMessage() throws InterruptedException {

            return replyQ.get();

        }

    }

}

You might perhaps ask why I’m using an unbounded queue inside the RendezvousMessageQueue, viewed in the light of what I have said in this article about the danger of using unbounded queues. The reason is that the queue can’t be longer than the number of threads in the program, it therefore exist an upper bound of the queue size.

Using a RendezvousMessageQueue it’s straight forward to implement our fibonacci server. Below I show you the server and a simple client. As always, you can find the complete source code to this program and all other programs at the end of this article.

class FibonacciServer extends Thread {

    private RendezvousMessageQueue  inbox = new RendezvousMessageQueue();

 

    public FibonacciServer() {. . .}

 

    public int    compute(int n) throws InterruptedException {

        return ( (Integer)inbox.put( new Integer(n) ) ).intValue();

    }

 

    public void run() {

        while (true) {

            RendezvousMessageQueue.MessageWrapper  w =

                (RendezvousMessageQueue.MessageWrapper) inbox.get();

            int  n = ( (Integer)w.getSenderMessage() ).intValue();

            int  f = fib(n);

            w.putReplyMessage( new Integer(f) );

        }

    }

 

    private int fib(int n) {

        return (n <= 2 ? 1 : fib(n-1) + fib(n-2));

    }

}

 

class Client extends Thread {

    private FibonacciServer  server;

    private int              n;

 

    public Client(int id, FibonacciServer server, int n) {

        super("Client-"+id);

        this.server = server;

        this.n = n;

    }

 

    public void run() {

        for (int i = 1; i <= n; i++) {

            System.out.println(getName()

                + ": fib("+i+") = " + server.compute(i));

       }

    }

}

Running the program with one client gives this output.

>java Fibonacci -c 1 -n 30

Client-1: fib(1) = 1

Client-1: fib(2) = 1

Client-1: fib(3) = 2

Client-1: fib(4) = 3

Client-1: fib(5) = 5

Client-1: fib(6) = 8

Client-1: fib(7) = 13

Client-1: fib(8) = 21

Client-1: fib(9) = 34

Client-1: fib(10) = 55

. . .

Client-1: fib(27) = 196418

Client-1: fib(28) = 317811

Client-1: fib(29) = 514229

Client-1: fib(30) = 832040

Here is the excerpt from another run, where the number of clients is 100.

>java Fibonacci -c 100 -n 30

Client-2: fib(1) = 1

Client-13: fib(1) = 1

Client-3: fib(1) = 1

Client-2: fib(2) = 1

Client-4: fib(1) = 1

Client-5: fib(1) = 1

Client-6: fib(1) = 1

Client-13: fib(2) = 1

Client-7: fib(1) = 1

Client-8: fib(1) = 1

Client-10: fib(1) = 1

Client-3: fib(2) = 1

. . .

Client-16: fib(29) = 514229

Client-80: fib(29) = 514229

Client-90: fib(28) = 317811

Client-100: fib(30) = 832040

Client-16: fib(30) = 832040

Client-80: fib(30) = 832040

Client-90: fib(29) = 514229

Client-93: fib(30) = 832040

Client-90: fib(30) = 832040

We can now extend the class diagram of our library with two new classes; the rendezvous queue with its wrapper class. The updated class diagram is shown below.

In this section we have motivated and implemented thread method invocation (TMI) or rendezvous in Ada parlance. That means it’s now possible to declare a thread method and invoke it from another thread using the well-known method call semantics.

However, if we want two or more thread methods and we in addition want to receive messages from both of them at the same time, there is no simple solution. Follow me on to the next section, were I will address that problem.

Non-Deterministic Message Acceptance

One of the biggest advantages with thread method invocation (TMI) is type safety. The parameter type (or types) together with a return type, determines what kind of data can be sent to the recipient side. In addition, every violation will be detected by the compiler instead of slipping through as a run-time bug.

Problem 8 – Why you sometimes must combine TMI with non-determinism

This is all good, until we consider more than one thread method. Let’s pretend we have small life-sustaining system, were one thread (HeartSampler) samples the heartbeats and another one (BreathingSampler) samples the breathing rate. These two threads send their data to the Merger thread, for further processing. The public interface of the Merger might be viewed as this

interface Merger {

    void  heartBeat(float amplitude);

    void  breathing(float pressure, float carbonDioxide);

}

I’m not a medical specialist; so don’t expect me to know what I’m talking about regarding the parameters in the methods of the interface above. Just view it as two methods with some parameters.

The merger can be realized as a thread with two message queues, as shown in the class declaration below. Each queue has a corresponding message wrapper that can contain the sent arguments.

class MergerThread extends Thread implements Merger {

  private MessageQueue    heartQ  = new BoundedMessageQueueSkipOld(100);

  private MessageQueue    breathQ = new BoundedMessageQueueSkipOld(100);

 

  private class HeartMsg {

    float amplitute;

    HeartMsg(float a) {amplitude=a;}

  }

  private class BreathMsg {

    float pressure, carbonDioxide;

    BreathMsg(float p, float c) {pressure=p; carbonDioxide=c;}

  }

 

  public void  heartBeat(float amplitude) {

    heartQ.put( new HeartMsg(amplitude) );

  }

  public void  breathing(float pressure, float carbonDioxide) {

    breathQ.put( new BreathMsg(pressure, carbonDioxide) );

  }

 

  public void    run() {. . .}

}

So far, we have seen how to receive messages from one queue at a time. Below I show you one possibility of receiving messages from two queues.

while (. . .) {

  HeartMsg  h = (HeartMsg)heartQ.get();

  BreathMsg b = (BreathMsg)breathQ.get();

  . . .

}

The problem with the code fragment above is that it requires that the heart rate is equal to the breathing rate, an assumption that is just plain wrong. The code states that first should we receive a heartbeat message and then after should we wait for and receive a breathing message. Try holding your breath and counting the number of heartbeats during that time interval. Then do it again and compare with another person. The heartbeat rate and the breathing rate are two independent rates and can’t be predicted accurately, we therefore must be able to wait for either a heartbeat message or a breathing message.

Analysis

This problem formulation has been identified long time ago in the real-time programming language design community. It is called non-deterministic message acceptance, because it is non determined which message type that will be accepted next.

The Ada language is based on message passing between threads (called tasks) using thread method invocation (called rendezvous). Each thread message (called entry point) can be received (called accepted) in isolation in an ACCEPT statement or together in a SELECT statement containing several ACCEPT statements. The SELECT statement lists all message types that can be accepted by the thread at that particular point in the thread’s execution path.

The SELECT statement suspends the caller if no messages have been sent. The first message received will resume the (server) thread and the execution continues from the corresponding ACCEPT statement. If more than one message (type) have been sent, the SELECT statement must choose one ACCEPT statement. The semantics of Ada states that it must be an arbitrary message but chosen in a fair manner. This means not the same ACCEPT can be chosen every time.

The most straightforward implementation arranges the queues in a (logical) circle and searches for a non-empty queue at most one turn. The next search starts from the next queue in the circle.

Solution

The solution to our merger problem is to implement a similar mechanism in Java as the SELECT statement in Ada. We can’t change the syntax of Java, so we have to stick to some object that realizes the requested semantics.

A MessageQueueGroup assembles a set of MessageQueue objects and provides an Ada SELECT similar semantics. In order to properly implement the group we need to solve two sub-problems. The first is the easy one; management of a java.util.ArrayList and an index pointing to the start position for a circular search.

The other problem is subtler, how to manage the synchronization among the different queues and the group object. Think about it. A client thread inserts a message into some queue, by calling its put method. This method notifies another party by the method notifyAll. The server thread calls a method on the group object to get a non-empty queue. If no queue can be found the server is suspended by calling wait. The queue and the group are different object, which means they denote different monitors as well. The figure below illustrates the dilemma. The notifyAll call on the third queue’s lock, doesn’t resume the server, because it’s sleeping at the group’s lock.

We can solve this problem by ensuring that all objects agree on one single lock. Here comes the synchronized block to rescue. In the solution below, all queues are using the lock of the group object. This is set-up when a queue is added to a group and reset when it’s removed again.

All message queue classes inherit from AbstractMessageQueue, which implements both the put and the get method using the template method design pattern. The queue logic is delegated to some subclass but the synchronization logic is managed by the class itself. This class is modified to use a specific object for synchronization instead of its own lock. This lock object is initialised to the value this, but can be changed in a setter method. All public methods are using a synchronized block with the lock object as argument instead of being declared as synchronized methods. The code fragment below shows the changes.

public abstract class AbstractMessageQueue implements MessageQueue {

    protected Object synchObj = this;

    . . .

    protected void setSynchObject(Object obj) {

        synchObj = obj;

    }

    public Object get() throws InterruptedException {

        synchronized (synchObj) {

            Object msg = performGet();

            synchObj.notifyAll();

            return msg;

        }

    }

    . . .

}

The RendezvousMessageQueue class needs to be changed a little bit different, because it’s using an inner queue. The setSynchObject method is overridden, where it delegates the synchronization object to the inner queue.

public class RendezvousMessageQueue extends AbstractMessageQueue {

    private MessageQueue    queue = new UnboundedMessageQueue();

    . . .

    protected void setSynchObject(Object obj) {

        ( (AbstractMessageQueue)queue ).setSynchObject(obj);

    }

    . . .

}

I show you the code for the MessageQueueGroup class below. You can find the source code for this class and all other classes at the end of this article.

public class MessageQueueGroup {

    private List    queueSet       = new ArrayList();

    private int     startSearchIdx = 0;

 

    public MessageQueueGroup() {}

 

    public synchronized void    add(MessageQueue queue) {

        if (queueSet.contains(queue)) return;

 

        ( (AbstractMessageQueue)queue ).setSynchObject(this);

        queueSet.add(queue);

    }

 

    public synchronized void    remove(MessageQueue queue) {

        if (!queue.isEmpty()) {

            throw new IllegalArgumentException("...");

        }

        if (queueSet.contains(queue)) {

            queueSet.remove(queue);

            ( (AbstractMessageQueue)queue ).setSynchObject(queue);

        }

    }

 

    public synchronized MessageQueue getNonEmptyQueue()

      throws InterruptedException

    {

        while (true) {

            MessageQueue queue = findQueue(startSearchIdx);

            startSearchIdx     = next(startSearchIdx);

            if (queue != null) return queue;

 

            wait(); //Suspend the receiving side (server)

        }

    }

 

    protected MessageQueue findQueue(int startIdx) {

        if (queueSet.isEmpty())

            return null;

 

        int idx = startIdx, endIdx = idx;

        do {

            MessageQueue q = (MessageQueue) queueSet.get(idx);

            if (!q.isEmpty()) return q;

 

            idx = next(idx);

        } while (idx != endIdx);

 

        return null;

    }

 

    protected int next(int idx) {

        return (idx + 1) % queueSet.size();

    }

}

Using our new group object, we can return to the merger thread problem in the beginning of this section. The constructor should create a MessageQueueGroup object and add the two queues to it.

. . .

private MessageQueue       heartQ  = new BoundedMessageQueueSkipOld(100);

private MessageQueue       breathQ = new BoundedMessageQueueSkipOld(100);

private MessageQueueGroup  group   = new MessageQueueGroup();

 

public MergerThread() {

    group.add(heartQ);

    group.add(breathQ);

}

We can then rewrite the code fragment of run method in the MergerThread to this

while (. . .) {

  MessageQueue  q   = group.getNonEmptyQueue();

  Object        msg = q.get();

  if (q == heartQ) {

    HeartMsg  hmsg = (HeartMsg)msg;

    . . .

  } else {

    BreathMsg bmsg ) (BreathMsg)msg;

    . . .

  }

  . . .

}

The dispatching of the message acceptance logic if performed by a simple if-statement, but if there are many message type, it’s better to use a switch-statement together with fixed integer values for each message type.

Let’s write a small program that demonstrates the idea of non-deterministic message acceptance in practice. I have deliberately changed the heartbeat and breathing threads into two simple number counter threads instead, where the first sends odd numbers and the second even numbers. The merger thread adds all received numbers together.

Here are the relevant parts of the program. I have made some simplifications for clarity. You can find the complete source code at the end of this article.

interface Merger {

    void    putOddNumber(int n);

    void    putEvenNumber(int n);

}

 

class MergerThread extends Thread implements Merger {

    private MessageQueue        oddQ  = new BoundedMessageQueue(10);

    private MessageQueue        evenQ = new BoundedMessageQueue(10);

    private MessageQueueGroup   group = new MessageQueueGroup();

    private int                 sum   = 0;

 

    public MergerThread() {

        group.add(oddQ);

        group.add(evenQ);

        start();

    }

 

    public void putOddNumber(int n)  {oddQ.put (new Integer(n));}

    public void putEvenNumber(int n) {evenQ.put(new Integer(n));}

    public int  getSum()             {return sum;}

 

    public void run() {

       while (true) {

           MessageQueue  q = group.getNonEmptyQueue();

           int           n = ( (Integer)q.get() ).intValue();

           sum += n;

       }

    }

}

 

private static class ClientThread extends Thread {

    private Merger    merger;

    private boolean   sendOddNumbers;

    private int       max;

 

    public ClientThread(int max, boolean sendOddNumbers, Merger merger) {

       this.max            = max;

       this.merger         = merger;

       this.sendOddNumbers = sendOddNumbers;

       start();

   }

 

   public void run() {

       for (int k = (sendOddNumbers ? 1 : 2); k <= max; k += 2) {

           if (sendOddNumbers) {merger.putOddNumber(k);  sleep(5);}

           else                {merger.putEvenNumber(k); sleep(15);}

       }

   }

}

Running the program, were it counts from 1 to 10000, gives the output shown below

>java NumberMerger -m 10000

max = 10000

--------------

Sum     : 50005000

Expected: 50005000

We are almost through, however there is one glitch left. Sometimes, you can’t wait for all queues in a group. Follow me on to the next section were this problem will be dissected.

Problem 9 – Why you might need guards when you are using non-determinism

The SELECT statement in Ada and our MessageQueueGroup class in Java are terrific operators in the context of message passing, because they allow a thread to wait for different types of messages concurrently. However, in certain situations it’s not appropriate to wait for every possible message type. The application state of a thread may dictate that a subset of the available types should be enabled instead. Let’s jump into an example to clarify what I mean.

The POTS (Plain Ordinary Telephony Services) algorithm is used in telephone branch exchange systems and it controls a single terminal (phone) and its interactions with other terminals. It’s usually expressed as a state machine, with input signals as off-hook, button(b), on-hook, etc and state transitions based on the inputs. It’s out of scope for this article to describe all of POTS, because it’s way to complex, but I can give you a slight flavour of it.

A terminal starts out in the idle state and can receive an off-hook signal, when the user lifts the handset. The controller attaches the “ready tone” generator to the terminal, so the user knows when to start dial. The controller transitions to the waitForFirstDigit state, where it can receive either a button(b) signal or an on-hook signal.

If the controller receives an on-hook signal, because the user laid down the handset again, it detaches the generator and transitions to the idle state again. If the controller receives a button(b) signal instead, it detaches the generator, sends the button id to a number analyser and transitions to the waitForMoreDigits state.

In this state it accepts both button(b) signals and on-hook signals. The number analyser examines a digit sequence and determines if it’s incomplete, complete, or invalid. Depending on the outcome the controller takes different transitions.

If we want to model the controller as a message-passing thread, it would be very convenient if we could enable and disable different signals based on the controller state.

This problem formulation has been identified as well, in the real-time language community. A guard can enable or disable an entry point (message type). You can find the concept of guards in virtually all message-passing languages, like Ada, Concurrent C, Erlang, Occam and others.

We can introduce the idea in our message queue library, using an interface.

public interface Guard {

    public boolean isEnabled();

}

A guard object can then be linked together with a queue whenever it’s added to a message queue group. The search for a non-empty queue need also be modified to check for if a guard is enabled or not. The code fragments below show the new changes in the MessageQueueGroup class.

public synchronized void    add(MessageQueue queue, Guard guard) {

    GuardedQueue q = new GuardedQueue(queue, guard);

    if (queueSet.contains(q)) return;

 

    ( (AbstractMessageQueue)queue ).setSynchObject(this);

    queueSet.add(q);

}

 

protected MessageQueue findQueue(int startIdx) {

    if (queueSet.isEmpty())

        return null;

 

    int idx = startIdx, endIdx = idx;

    do {

        GuardedQueue q = (GuardedQueue) queueSet.get(idx);

        if (q.isReady()) return q.queue;

        idx = next(idx);

    } while (idx != endIdx);

    return null;

}

 

protected class GuardedQueue {

    MessageQueue queue;

    Guard guard;

 

    public GuardedQueue(MessageQueue queue, Guard guard) {

        this.queue = queue;

        this.guard = guard;

    }

    public boolean isReady() {

        return (queue.isEmpty() == false)

            && (guard == null || guard.isEnabled());

    }

    public boolean equals(Object other) {. . .}

}

At last, I want to illustrate the usage of guards with a message queue group. Instead of choosing a realistic example where the problem domain might hide the important points in the context of this example, I prefer to use a toy example instead.

The idea is a variant of the previous program, with two client threads sending a stream of numbers, one with odd numbers, and the other with even numbers. As before, there is a server thread responsible for accumulating the sum. The difference is a new active mailbox thread in-between. The logic of the active mailbox should be well known, if you have read the whole article so far. Therefore, we can concentrate on the synchronization logic instead, which is far from trivial.

The entry points of the mailbox (called Buffer) is described by the interface below

interface Buffer {

   void    put(int x) throws InterruptedException;

   int     get()      throws InterruptedException;

   void    quit()     throws InterruptedException;

}

The client and server are trivial and I will only show you the main loop of each thread. The main loop of the client looks like this

for (int k = (sendOddNumbers ? 1 : 2); k <= max; k += 2) {

    buffer.put(k);

}

Moreover, the main loop of the server looks like this

int k;

while ((k = buffer.get()) > 0) {

    sum += k;

}

In both cases are buffer a pointer to the active buffer thread. All three thread methods of Buffer, are of rendezvous type. You can see that the get method returns a value.

The buffer thread is more complex compared to what we have seen so far in this article. It contains three queues, one for each thread method and a queue group object responsible for the non-deterministic selection of a non-empty queue. In addition, it contains variables for the mailbox logic. Here is the first part of the thread. As usual, you can find the complete source code at the end of this article.

class BufferThread extends Thread implements Buffer {

    private static final int            PUT   = 0;

    private static final int            GET   = 1;

    private static final int            QUIT  = 2;

    private RendezvousMessageQueue      putQ  = new RendezvousMessageQueue();

    private RendezvousMessageQueue      getQ  = new RendezvousMessageQueue();

    private RendezvousMessageQueue      quitQ = new RendezvousMessageQueue();

    private MessageQueueGroup           group = new MessageQueueGroup();

    private Message                     msg   = null;

    private boolean                     full  = false;

 

    public BufferThread() {

        group.add(putQ,  new IsEmptyGuard());

        group.add(getQ,  new IsFullGuard());

        group.add(quitQ, new IsEmptyGuard());

        start();

    }

    . . .

Three guards are supplied when the three queues are added to the group. Here is one the two guard classes.

class IsEmptyGuard implements Guard {

    public boolean isEnabled() {return !full;}

}

Next, I show you the client-side of the three thread methods, where messages are inserted into queue.

public void put(int x) throws InterruptedException {

    putQ.put(new Message(PUT, x));

}

public int get() throws InterruptedException {

    Message  reply = (Message) getQ.put(new Message(GET));

    return reply.value;

}

public void quit() throws InterruptedException {

    quitQ.put(new Message(QUIT));

}

Every message inserted, is wrapped by a Message object, containing a type id and the message value. When a message is received, the correct acceptance logic is chosen based on the type id. Here is finally the run method of the buffer thread.

public void run() {

  boolean running = true;

  while (running) {

    RendezvousMessageQueue  q = (RendezvousMessageQueue)group.getNonEmptyQueue();

    RendezvousMessageQueue.MessageWrapper  w = q.getWrapper();

    Message  m = (Message)w.getSenderMessage();

    Message  r = null;

    switch (m.type) {

        case PUT:

            full = true;

            msg  = m;

        break;

        case GET:

            full = false;

            r    = msg;

        break;

        case QUIT:

            running = false;

        break;

    }

    w.putReplyMessage(r);

  }

}

The first statement inside the loop, suspends the thread waiting for a new message. When this happens, it resumes and the queue is returned. The queue is of rendezvous type and contains a wrapper object, which is extracted in the second statement. The wrapper contains the sender’s message and a single slot queue for the sender thread to wait for the reply message in. The sender’s message is extracted from the wrapper at the third line.

The switch statement dispatches on the message type based on the type id in the sender’s message. Each case entry manipulates the application state as you can see above.

The last thing that happens, before it starts over again, is that the sender gets a reply. The reply message contains data only in case of the GET method. In the other cases, it has the value null. The important thing to consider is the synchronization between the sender and the receiver. The sender is suspended until the receiver calls putReplyMessage.

Now, let’s run the program to ensure that it works in practice. If you ever wanted to know the sum of 1,2…100000 here it is.

>java ActiveBuffer -m 100000

max = 100000

Buffer Started

Server Started

Client-ODD Started

Client-EVEN Started

Client-EVEN Terminated

Client-ODD Terminated

Server Terminated

Buffer Terminated

--------------

Sum     : 5000050000

Expected: 5000050000

Conclusions

This article started from the problem formulation of a simple communication link between two threads. I have show you the minimum code needed to reliable transfer data between threads and pointed out many pitfalls along the way.

After the basic mechanism, a mailbox, was presented, I generalized the idea into a message queue. The most important abstraction presented was the idea of Thread Method Invocation (TMI) a concept inspired by the Ada programming language and Remote Method Invocation in Java.

I rounded up with a discussion of non-deterministic message acceptance and how it can be implemented in Java.

There are more aspects of this topic than I have discussed in this article, which might be objective for another article. For example, how to implements active objects, based of our strategy for non-deterministic message acceptance or the danger of deadlocks in concurrent systems.

You can find a link to a zip file containing all source code for this article. It includes the message queue library and all demo programs, plus an Ant build script. This script can compile all sources, run all unit tests, and generate JavaDoc for the library.