In association with heise online

Scenario 2: Actor scheduling

There is currently a lot of interest in actors, as they are more lightweight than threads and, thanks to their exclusive use of message-based communication, provide a more robust programming model. Behind the scenes, however, many actor frameworks utilise thread pools, the worker threads of which execute those actors with messages in their inboxes. The ForkJoinPool is therefore of particular interest for actor frameworks for the JVM.

We will use a small actor scheduling implementation, which uses similar mechanisms to those used in the Akka framework. Here we will confine ourselves to demonstrating task definition – the rest can be looked up in the GitHub repository.

public class ActorForkJoinTask extends RecursiveAction {
private final AbstractDispatcher dispatcher;
private final Mailbox mailbox;

public ActorForkJoinTask(AbstractDispatcher dispatcher,
Mailbox mailbox) {
this.dispatcher = dispatcher;
this.mailbox = mailbox;
}

@Override
protected void compute() {
int counter = 0;
Message message;

while (counter++ < ActorBenchmarkConfig.MAX_CONSUME_BURST
&& (message = mailbox.pollMessage()) != null) {
mailbox.getActor().receive(message);
}

mailbox.setScheduled(false);

if (!mailbox.isEmpty()) {
dispatcher
.scheduleUnlessAlreadyScheduled(mailbox
.getActor().getId());
}

}
}

ActorForkJoinTask is derived from RecursiveAction, as it does not require a return value. When executing the task, messages are taken from the actor's inbox and passed to the actor for processing. If the inbox contains further messages after reaching an upper limit, the actor is again marked for execution. To achieve this, the dispatcher adds a new ActorForkJoinTask to the worker thread's local task queue.

The implementation of the task for the ThreadPoolExecutor is practically identical, but is derived from Runnable. If an actor is to be executed, the dispatcher adds a new task to the central inbound queue.

With both the ForkJoinPool and the ThreadPoolExecutor, the dispatcher keeps an internal check to ensure that no actor is scheduled twice. This ensures that two threads will never execute the same actor simultaneously, so that the actual actor implementation does not need to worry about thread synchronisation.

Analysing the results

The benchmark is structured in exactly the same way as in the MapReduce scenario. Below we examine the results from a scenario involving 1000 actors. 200 of the actors begin the task with one message in their inboxes, which is forwarded to a random actor 100,000 times successively before finally being deleted. When an actor is executed, it always processes just one message at a time.


Zoom The ForkJoinPool does better at actor scheduling than the ThreadPoolExecutor (figure 6).

Figure 6 shows that the ThreadPoolExecutor is entirely unsuited to this scenario. The shared inbound queue results in a large number of competition situations, as can be seen from the thread states shown in figure 7.


Zoom Competition for accessing the shared inbound queue means that the ThreadPoolExecutor's threads spent a lot of their time waiting (figure 7).

Thanks to local task queues, the ForkJoinPool, by contrast, is able to accelerate computation up to the number of virtual processors. Each worker thread continuously fills its own task queue with new actor tasks, so that no competition situations for accessing the queues occur.

It is notable that the total speed gain is fairly slight, indeed, in the case of the ThreadPoolExecutor always less than 1 (i.e. sequential computation is faster). The reasons for this are, firstly, the requirement described above to, under parallel scheduling, avoid having an actor executed by multiple threads simultaneously. This entails a tangible synchronisation overhead, which sequential processing avoids, and, secondly, in the above example, the actors generate hardly any workload when receiving a message (the messages are simply forwarded to another actor), so that there is little to parallelise.

In terms of throughput, for actor scheduling, the ForkJoinPool clearly leads the way. But this scenario raises questions of fairness. Since a worker thread always plucks the most recently added task from its task queue, some actors may sit in the task queue for some time while other actors are being executed several times. For cases such as these, the ForkJoinPool provides the asyncMode parameter. If this is set, each worker thread processes the local task queue in the order in which the tasks were scheduled. By doing so, a worker thread risks finding itself competing with other threads, but using this parameter does ensure greater fairness. In the above example the overhead caused by using asyncMode is relatively small, so, for the sake of fairness, it is worth doing.

asyncMode does not, however, help with new tasks scheduled from outside. If worker threads are constantly generating new tasks for themselves, such tasks can be left high and dry in the inbound queue. This is one of the major reasons why recent developments based on ForkJoinPool have done away with the central inbound queue and instead distribute tasks scheduled from outside directly to the threads' task queues. The latest version of the implemented solution can be downloaded and tested from Doug Lea’s Concurrency JSR-166 Interest Site. The site also includes a number of classes that were not added to Java 7 or are not scheduled for inclusion until Java 8.

Next: Summary

Print Version | Permalink: http://h-online.com/-1762357
  • Twitter
  • Facebook
  • submit to slashdot
  • StumbleUpon
  • submit to reddit
 


  • July's Community Calendar





The H Open

The H Security

The H Developer

The H Internet Toolkit