Saturday, December 12, 2009

Java 5 + GPars: Throttling Action Processing

An interesting question came up on the GPars mailing list today: In a system that generates events, what is the best way to throttle back event processing to one event per second? I thought about an answer... then thought some more... and finally decided to write it all up in this blog post. The example uses Groovy and GPars, but it is easily adapted to a generic Java solution. Don't let the actors scare you! (or the lack of semi-colons, for that matter).

The example is the classic "Sleeping Barber" problem (I hadn't heard of it either). Basically, there is a barbershop. The barber is asleep. Customers walk into the waiting room periodically, and the barber wakes up to give each of them a haircut. When he's done he returns to his slumber. It's a lesson in reaction: something is asleep, then awakens to do some work, then returns to sleep.

The GPars docs provide a decent Actor based solution to this problem: there is a waiting room and a barber, and both are actors. When the barber is free, a customer from the waiting room is moved into the barber's chair. The barber and waiting room communicate via actor messages. But what if our barber is a bit of a diva, and no matter how busy the shop gets, he wants to give one haircut every 15 minutes and never any more (otherwise, he might get burnt out you see). That is the throttling problem: how do you make sure events are processed (a haircut is given) no more than x number of times in a given time period?

My solution: keep a work queue, and have a scheduled executor pull work off the queue at a specified interval. Java 5 gives you all the tools to do this without resorting to busy waiting, polling, or writing scheduling code. The classes you need to know about are ArrayBlockingQueue and ScheduledThreadPoolExecutor.

ArrayBlockingQueue is a FIFO (First-In-First-Out) queue that supports blocking instead of busy waiting. When you take() an item from an ABQ, the call blocks until an item is available... no polling or sleeping to see if an item is ready to available. Just call take() and your code won't proceed until there is an element found.

The ScheduledThreadPoolExecutor supports executing both Runnable and Callable objects at a fixed interval. If you're looking to execute the same task every 1 second then STPE is what you need... Timer, for all intents and purposes, has been deprecated.

So here's the barber that just won't stand to be over-worked... setting it all up we need customers, a barber, and a waiting room:

class Customer {
String name
}

// waiting room can't hold more than 12!
def waitingRoom = new ArrayBlockingQueue(12)

def barber = Executors.newScheduledThreadPool(1)
barber.scheduleAtFixedRate({
println 'Barber: Next customer please!'
Customer customer = waitingRoom.take()
println "${customer.name} gets a haircut at ${new Date().format('H:m:s')}"
} as Runnable, 0, 15, TimeUnit.MINUTES)


Customer is a simple bean; nothing interesting here. The waiting room is an ArrayBlockingQueue filled with customers that need a haircut. And the barber is an executor service with a scheduled task to give haircuts. The number of threads in the scheduled thread pool is 1 because there's only one barber. The barber takes customers from the waiting room and cuts their hair once every 15 minutes. The call to waitingRoom.take() is blocking... if there is a customer ready then he is serviced immediately, and if one is not, then the call blocks until someone is available. Once thing to note... the waitingRoom has a size of 12... if a 13th customer is added then the calling code will either block until there is enough room or throw an exception. There is an API to do either case.

So how do customers get into the waiting room? That's where GPars actors come in. The barber shop is a "reactor" in GPars terminology. Messages can be sent to the barbershop ("Enter" the waiting room), and the reactor adds the customer to the waiting room. A maƮtre d' of sorts. Here it is in action:
def barberShop = new PooledActorGroup().reactor {message ->
switch (message) {
case Enter:
println "${message.customer.name} waits for a haircut..."
waitingRoom.add(message.customer)
break
}
}

class Enter {
Customer customer
}

barberShop << new Enter(customer: new Customer(name: 'Jerry'))
barberShop << new Enter(customer: new Customer(name: 'Phil'))
barberShop << new Enter(customer: new Customer(name: 'Bob'))
barberShop << new Enter(customer: new Customer(name: 'Ron'))

The barberShop is a PooledActorGroup, a GPars object, and the "actor framework" just means adding a closure to the reactor() method of that group. The closure, or actor, and responds to Enter messages by adding the customer to the waitingRoom. At the bottom you see the nice << syntax for posting events to the ActorGroup.

So there you have it. There are many ways to do this, but I think the Java 5 Concurrency libraries are some of the best options. I'd be interested to hear other ideas too. Now go give those hippies some haircuts!

4 comments:

paulk_asert said...

Very nice example! One minor improvement you can make: a Closure already implements Runnable, so you can remove the 'as'.

Aniket Awati said...

yeah the example is very nice

Chad LaVigne said...

Cool post Hamlet. Another solution that comes to mind is to use the token bucket algorithm, which would remove the need for any threading code. I've used it in the past for similar situations such as needing to throttle the number of log messages sent to an e-mail appender in a given period of time.

Hamlet D'Arcy said...

@Chad cool, i had not heard of the token bucket... looks like I'll play around with it.