Tuesday, August 12, 2008

Java Fork/Join + Groovy

This is an executable blog post. Download the post here and run it using Groovy with jsr166y.jar in your classpath. This was created and presented for the Groovy Users of Minnesota group.

Introduction

Brian Goetz on the need for better Java support for parallelism: "The Java language has had support for threads and concurrency from day 1; the language includes synchronization primitives such as synchronized and volatile, and the class library includes classes such as Thread. However, the concurrency primitives that were sensible in 1995 reflected the hardware reality of the time: most commercially available systems provided no parallelism at all, and even the most expensive systems provided only limited parallelism. In those days, threads were used primarily for expressing asynchrony, not concurrency, and as a result, these mechanisms were generally adequate to the demands of the time."

"As multiprocessor systems started to become cheaper, more applications needed to exploit the hardware parallelism they provided, and programmers found that writing concurrent programs using the low-level primitives provided by the language and class library was difficult and error-prone."

"As we enter the many-core era, we will need to find finer-grained parallelism or risk keeping processors idle even though there is plenty of work to do.... Java 7 will include a framework for representing a certain class of finer- grained parallel algorithms: the fork-join framework."

About Fork/Join
Fork/Join for Java is a framework developed by Doug Lea, based on his original paper from June 2000. Fork/Join algorithms are parallel versions of divide-and-conquer algorithms. If a task is small, then it is calculated sequentially on a single thread. If a task is large, then it is divided into several parts and each part is added to a queue for later computation. Queued tasks are free to divide again, be worked on by any worker thread, or queue up waiting if the maximum number of threads has been reached. When all the pieces have been divided up and calculated, then a final result is calculated off the partial results of the distributed pieces. "Fork" refers to the division of tasks, and "Join" refers to the merging of results.

Fork/Join is similar to MapReduce in that they are both algorithms for parallelizing tasks. One difference is that Fork/Join tasks know how to subdivide themselves if too large, whereas MapReduce algorithms typically divide up all the work into portions before the algorithm starts.

The cleverness of fork/join lies in its worker scheduling method called "work stealing". Each worker thread maintains a double ended queue of tasks. Any forks of a task get pushed onto the head of that thread's deque, not back onto the general threadpool. Workers execute tasks in youngest first order. When a worker has no more tasks to run it attempts to steal a task from another worker by grabbing the tail of the other thread's deque. When a worker thread has no work and fails to steal any from others, it backs off.

One advantages of work stealing is reduced contention because stealers take tasks from the opposite end of the deque than workers. Another advantage occurs because recursive divide-and-conquer algorithms generate larger tasks early. Older, stolen tasks provide larger units of work leading to further recursive division.

Fork/Join is being developed by the JSR expert group using the name jsr166y. The main JSR 166 was included in Java 5 and the java.util.concurrent classes.

Useful Links
This file is a guide to using Groovy with Java Fork/Join (JSR 166y).
JSR Home: http://jcp.org/en/jsr/detail?id=166
Concurrency Interest: http://gee.cs.oswego.edu/dl/concurrency-interest/
Wiki: http://artisans-serverintellect-com.si-eioswww6.com/default.asp?W1
JSR API: http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166ydocs/
Downloadable Jar (Tested with Java 6): http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166y.jar Original Paper: http://gee.cs.oswego.edu/dl/papers/fj.pdf
Brian Goetz on Fork Join:
Part 1: http://www.ibm.com/developerworks/java/library/j-jtp11137.html
Part 2: http://www.ibm.com/developerworks/java/library/j-jtp03048.html

Table of Contents

import jsr166y.forkjoin.ForkJoinPool
import jsr166y.forkjoin.ForkJoinExecutor
import jsr166y.forkjoin.RecursiveAction
import jsr166y.forkjoin.ParallelArray
import jsr166y.forkjoin.Ops.Predicate
import jsr166y.forkjoin.Ops.Procedure
import jsr166y.forkjoin.Ops.ObjectToDouble
import jsr166y.forkjoin.Ops.DoubleReducer
import jsr166y.forkjoin.ParallelDoubleArray.SummaryStatistics
A ForkJoinPool is a host for a group of ForkJoinWorkerThreads that perform ForkJoinTasks. ForkJoinPool does not implement the Java ExecutorService interface because it only executes ForkJoinTasks, not arbitrary Runnables.

This code produces a Fork/Join pool with 10 workers
ForkJoinExecutor pool = new ForkJoinPool(poolSize: 10);
Everyone loves the fibonacci sequence. It will be the basis for a few examples showing how the core of Fork/Join works. Here is a sequential implementation:
def fib
fib = {n ->
if (n <= 1) return n
else return fib(n - 1) + fib(n - 2)
}

println "Sequential fib: ${fib(15)}"
// Output: Sequential fib: 610
In order to use Fibonacci with Fork/Join, we need to model the algorithm as an object that can both solve the problem and subdivide the problem (that is, both conquer and divide). This class can do just that. The solve() method simple invokes the sequential fib(), while the fork() method returns a two element array representing parts which can be joined together later.
public class FibonacciProblem {
int n;

def solve() { fibonacci(n) }

def fork() {
return [new FibonacciProblem(n: n - 1), new FibonacciProblem(n: n - 2)]
}

def fibonacci(n) {
if (n <= 1) return n
else return fibonacci(n - 1) + fibonacci(n - 2)
}
}
Once the problem is modeled, we can wrap it in a Fork/Join Task that controls when to fork and when to solve (when to divide and when to conquer). The task also needs to hold the result in mutable state so that it can be queried later.
public class FibonacciTask extends RecursiveAction {
FibonacciProblem problem
int result

@Override protected void compute() {
if (problem.n < 4)
result = problem.solve()
else {
def subproblems = problem.fork()
def subTasks = subproblems.collect { new FibonacciTask(problem: it) }
forkJoin(subTasks)
result = subTasks.collect { it.result }.sum()
}
}
}
In the above example, notice how any problem less than 4 steps deep is simply solved inline, otherwise the problem is fork into multiple parts and re- queued using the forkJoin method. forkJoin is a convenience for separate fork() and join() calls, adding a list of subtasks to the deque.

One problem with modeling the fibonacci algorithm this way is that the algorithm is spread out across two classes: the *Problem and the *Task. The FibonacciProblem.fibonacci() method is complete, in that it uses the + operator to join its two results up in its recursive final statement. But if the problem is split then that + sign is represented in the FibonacciTask at the point where sum() is called. This is a duplication of logic! Perhaps it would be better to model the Problem and the Task as one entity... but then that entity might be overloaded because it is has task and problem functions.

Regardless, we can now throw a new FibonacciProblem on the framework and watch it compute across threads. Now, much larger results can be calculated without running out of stack space. The result should also be calculated faster. Disclaimer: this is an admittedly naive implementation of fib.
def task = new FibonacciTask(problem: new FibonacciProblem(n: 15))
pool.invoke(task)
println "Forked fib: $task.result"
// Output Forked fib: 610
So far, it's fair to make two observations: modeling a problem as a recursive fork/join task is hard, and using Groovy to do it offers little value. Luckily, JSR166y offers some higher level abstractions so that we don't have to deal with the low level tasks and actions directly, and using these higher level constructs is where Groovy closures pay dividends. Many of the divide-and-conquer algorithms that fit a fork/join framework operator on array and list types. Because of this, it is possible to build up a large and useful library of functions and types to hide the messy details of the fork/join tasks.

The following examples are all going to operate on a "Student" type. We'll generate some random test data for 1000 students. This usage of adding methods to java.util.Random is probably not the best use case for meta-programming but it sure is cute.
class Student {
int id;
int graduationYear;
double gpa; // 4 point scale
double adjustedGpa; // 5 point scale
}

Random.metaClass
Random.metaClass.randomYear = { 2000 + delegate.nextInt(11) }
Random.metaClass.randomGPA = { delegate.nextInt(41) / 10 }
Random rand = new Random();

def allStudents = (1..1000).collect {
new Student(id: it, graduationYear: rand.randomYear(), gpa: rand.randomGPA())
}
One of the core helper classes is the ParallelArray, which is an array which maintains an F/J executor in order to provide parallel aggregate operations. Creating one is pretty easy but requires a reference to the executor pool:
ParallelArray students = new ParallelArray(pool, allStudents as Student[])
Filtering selects a subset of the elements from the array. This is like a parallel version of Groovy's Collection#findAll(Closure). Note: The closure is not executed when withFilter() is called. It is lazily executed as needed, which in this case is when size() is called. Each invocation of the closure will potentially occur on a different thread. The return type is itself a functional array, so many of these methods may be chained, as you'll see later.
def graduates = students
.withFilter({ it.graduationYear <= 2008 } as Predicate)

println "${graduates.size()} of the ${students.size()} students have graduated."
// Output: 831 of the 1000 have graduated.
Application applies a void function to each element in the array, modifying the array in the process. It is a bit like Groovy's List#collect(Closure) except that it operates on the array itself and does not construct a new result. Again, each invocation of the closure potentially occurs on a different thread. However, in this case the closure is executed at the time apply() is invoked, not lazily.
students
.apply({ it.adjustedGpa = it.gpa * 1.25 } as Procedure)

def s = students.get(0)
println "Student $s.id has GPA $s.gpa of 4.0 and $s.adjustedGpa of 5.0"
// Output: Student 1 has GPA 2.6 of 4.0 and 3.25 of 5.0
Mapping converting elements from the original array type to a new type, exactly as you would expect Groovy's List#collect(Closure) to work. In this case, each closure invocation happens on a different thread and the invocation occurs lazily as needed. In this particular example, however, each closure executes on the same thread as the script. Why? The sequentially() method returns the ParralelArray as an Iterable without parallel evaluation. Oops! Be careful when mixing Collection and ParallelArray functions.
def allGpas = students
.withMapping({ it.gpa } as ObjectToDouble)

def perfectScores = allGpas.sequentially().inject(0, {acc, item ->
if (item == 4.0) return acc + 1
else return acc
})

println "$perfectScores students have a 4.0 average"
// Output: 24 students have a 4.0 average
Reduce returns a reduction of the ParallelArray. The Groovy analog is Collection#inject(Object, Closure). In this example you see how easy it is to chain a bunch of aggregate operators together. Again, each closure is invoked on a different thread as needed.
double bestGpa = students
.withFilter({ it.graduationYear == 2008 } as Predicate)
.withMapping({ it.gpa } as ObjectToDouble)
.reduce({previous, current -> Math.max(previous, current) } as DoubleReducer, 0)

println "The best GPA is $bestGpa"
//Output: The best PGA is 4.0
Summarization is a convenience for all the common aggregate operators, like size(), min(), max(), indexOfMin(), indexOfMax(), sum(), and average().
SummaryStatistics summary = students
.withFilter({ it.graduationYear == 2008 } as Predicate)
.withMapping({ it.gpa } as ObjectToDouble)
.summary()

ParallelArray.metaClass //add [] operators for convenience
ParallelArray.metaClass.getAt = { x -> delegate.get(x) }

def worstStudent = students[summary.indexOfMin()]
def bestStudent = students[summary.indexOfMax()]
println "Worst student: $worstStudent.id had GPA $worstStudent.gpa"
println "Best student: $bestStudent.id had GPA $bestStudent.gpa"
println "Average GPA: ${summary.average()}"
Summary
The Fork/Join ParallelArray type and libraries are very cool, even if it lacks some of the conveniences expected from Groovy libraries. The tasks and actions, on the other hand, were much harder to grasp and work with. Defining an algorithm recursively is one thing to learn, but then having to figure out how to fork the work is another, more difficult problem.

There's also the issue of the verbose types the framework uses to represent function types. The public class Ops, for instance, is a convenience class that defines 132 different function types. Ouch, this is too much to remember, and it seems like you could use generics to define 3 generic function types rather than the 132 shown in the docs. I shudder to think how hard this would be to write without an IDE with auto- import and type discovery. Hooray for static typing tools.

Working with the ParallelArray class was a breeze from Groovy, given that the IDE helps you cast closures to the correct type. I can't imagine enjoying using the framework from Java with the proliferation of anonymous inner classes that it would require. I hesitate to say that the BGGA closure proposal would be that much better... the example closures fit in one line of code now, but adding generics and type definitions would probably push these to multiple lines, at which point it might become a mess to read. A version of the framework was made compatible with the BGGA proposal, but recent signs point to no closures in Java 7. Hooray for Groovy for supporting this great today.

The ParallelArray class is wonderfully useful, but where are ParallelList and ParallelMap? Tieing the library to an array type limits the usefulness, and pretty much rules infinite lists out (I know, I know, it's an unbearable loss). I'd like to see these two types added to the library before release.

Also, why the inconsistent naming convention? The reduce function is called reduce, apply apply, but map is called withMapping and filter is called withFilter. This is another area I humbly submit for cleanup before release.

None of this solves the issue of mutable state within a multi-threaded environment. I heard second-hand that deadlock issues doomed the CILK project, with F/J is partly based on. Tasks are mutable by nature because the result is filled in at a later date, and it is left up to the developer to code defensively to avoid deadlocks, which pushes the developer back down into the assembly language of concurrency: synchronized and volatile. Is a parallelization framework not based on immutability doomed to failure?

Also, my intent was to use Terracotta to create a distributed Fork/Join executor. But the executors lack the fine grain task control that distributed systems would need. In the case of a failing fork, there is no way for the executor to recall the forked problem piece and reassign it to a different worker or add it back into the deque. The CommonJ framework seems up to the task, which is what Terracotta recommended using on their forum and docs. Is a parallelization framework not based on distributed tasks doomed to failure?

AND ANOTHER THING! I easily downloaded the framework, browsed the docs, and started using it with Java 6. Why does this need to be in the JDK? With Bea's CommonJ and Functional Java there seems to be movement in this space from many sources. I have no idea why F/J needs to be part of the JDK rather than just another 3rd party library.

I didn't mean for the gripes section to get so long, and it might be unfair to criticize a project that is still in the formalization process. It's a cool project and it's great to see some functional programming coming to the JDK. The ExecutorServices were a giant leap forward when included in Java 5, and perhaps the finished version of F/J will be the same thing for Java 7!

5 comments:

Alex Miller said...
This comment has been removed by the author.
Alex Miller said...

Great post as usual Hamlet. I wanted to mention Brian Goetz's JavaOne talk as he covers this same territory in some detail (and shows how closures could improve things). On this page you can find the slides and the audio:

Let's Resync: What's New for Concurrency on the Java Platform

It was one of my favorites...

Andres Almiray said...

Hamlet, I agree that the framework might do well outside of the JDK, but there are companies that can't use non-JDK libraries without a lot of red tape (believe me, I know). Besides adding it to the JDK instantly enables everyone (hooray for JRE bloat, go CORBA!)

ilya said...

One correction I'd add is regarding the note "deadlock issues doomed the CILK project."

That's not at all the case. Far from being doomed, the Cilk project has influenced the Intel Threading Building Blocks project, an upcoming standardized fork-join framework for Java, and a variety of projects at Microsoft. The paper presenting the original work was recently awarded a prize for being the most influential paper a decade ago. (Check out the 2008 Programming Language Design and Implementation Award post on our blog at www.cilk.com/multicore-blog/)

A number of real-world apps have been parallelized with MIT Cilk. What we found was that to reach broader adoption, we needed to add support for loops (rather than just recursion), support C++ rather than just C, and expand the supported platform base to include Windows. This is just what we are doing at Cilk Arts (founded by the inventors of Cilk).

check it out at www.cilk.com

Cheers,
ilya

Hamlet D'Arcy said...

Update: I criticized the ParallelArray Ops for having 132 different types when 3 generic types would suffice. On further digging, the Ops package does give you just a few generic Ops, such as Ops.BinaryOp<A,B,R> and Ops.BinaryPredicate<A,B>
The package also defines a whole bunch of primitive versions of these generic interfaces, which is why the interface count exploded. I was using the API wrong and could have used the generic version. My bad.