Asynchronous functions, actors and CPS

Using  Scala we can easily create lock free multi-threaded code, while still maintaining a familiar programming model.

 

There are two issues of writing multi-threaded code: synchronizing shared access to resources and running code asynchronously. In Java this is usually done with locks and implementing interfaces that wrap the asynchronous code. This leads to code that is cumbersome to write, error prone and inefficient in how it uses threads (since a thread may block for an unspecified time waiting on a lock).

 

In Scala we have actors (of course there are actor libraries for Java also, they are just less popular).  However, using actors means giving away on type safety (we cannot know the exact set of messages an actor handled) and is also cumbersome to use as a replacement for method calls.

 

Instead of just calling a method, one has to create a message and the actor needs to receive it and call the actual method. There are situations where actors provide the right abstraction, by being able to determine the order in which messages are handled, but this is not the focus of this post.

 

In this post I’d like to present an alternative way of programming, that will utilize actors behind the scenes, giving front stage to objects and method calls.

 

First, let’s model an asynchronous method. Such a method handles the computation in another thread. Since we don’t want to block, we create a followup function that is called once the computation is done:


def mult(i: Int, j: Int)(followup: Int => Unit)

 

We use it like:

 

mult(3, 4) {r => println(r)}

 

In this case, implementing mult is trivial:

 

def mult(i: Int, j: Int)(followup: Int => Unit) =
  followup(i * j)


(in general, followup should probably modeled as an Either of value or error, but I’d like to keep things simple for this post)

 

The body of mult runs in the thread of the caller. This is not good in the general case where a method modifies the state of an object, or where other methods modify the state and we want to avoid collisions.

 

A way of synching all methods is through a use of an actor. The trivial way is to create an internal actor inside the object and send it messages that do the computation:

 

class Obj {
  case class Mult(i: int, j: Int, followup: Int => Unit)
  def mult(i: Int, j: Int)(followup: Int => Unit) =
    impl ! Mult(i, j, followup)
 
  val impl = actor {
    loop {
      react {
       case Mult(i, j, followup) => followup(i * j)
      }
    }
  }
}
 

 

Of course  this is very cumbersome. The messages are used internally and quite trivially. Instead, we can use the actor to evaluate functions that each method creates:

 

trait AsyncSupport {
  val syncActor = actor {
    loop {
      react {
       case x: Function0[Unit]  => x()
      }
    }
  }
 
  def sync(body: => Unit) = syncActor ! {() => body}
}

 

(This is of course not a full implementation, need to add handling of the actor’s exit and maybe explicit start)

 

Then implementing mult is:

 

class Obj extends AsyncSupport {
  def mult(i: Int, j: Int)(followup: Int => Unit) =
    sync{followup(i * j)}
} 

 

So far, so good. But when we start creating more and more such function, code starts to become unreadable. For example, consider the expression (a + b) * (c + d). With asynchronous methods this would be:

 

add(a, b){r1 => add(c, d){r2 => mult(r1, r2)(followup)}}

 

One solution is to use for comprehension to hide the use of followup functions. Then code will look like:

 

for (r1 <- add(a,b);
       r2 <- add(c,d)
      )  followup(mult(r1, r2))

 

This has several drawbacks: first, it isn’t much more readable than the original code, second, it forces us to redesign our methods to conform to the rules of for comprehension, and thus make them less usable outside of its context.

 

Instead, we can use CPS

 

This post is not intended to introduce CPS, just to show its application here. I recommend searching for related articles (one linked below).

 

As a brief overview, CPS allows to take code that looks like this:

 

expr1 {a => expr2(a)}

 

And turn it “inside out” into something like

 

reset {
  val a = shift {expr1}
  expr2(a)
}

 

‘shift’ and ‘reset’ can be given other more descriptive names.

 

For example, it allows taking this code:

 

List("x","y","z").flatMap{left =>
   List(4,5,6).flatMap{right =>
     List((s,i))
  }
}

 

And transforming it to:

 

reset {
  val left = List("x","y","z")
  val right = List(4,5,6)
 
  List((left.reflect[Any], right.reflect[Any]))
}

 

(I didn’t include the definitions of ‘reflect’, you can find it here: http://dcsobral.blogspot.com/2009/07/delimited-continuations-explained-in.html)

 

This is close to the sugar magic being done by for comprehension:

 

for (left <- List("x","y","z");
       right <- List(4,5,6))
    yield(List((left, right)))

 

So, in our case, the aim is to be able to rewrite

 

add(a, b){r1 => add(c, d){r2 => mult(r1, r2)(followup)}}

 

As something like:

 

val r1 = add(a,b)
val r2 = add(c,d)
mult(r1, r2)

 

One way is to completely transform ‘add’ and ‘mult’ into CPS methods.

 

def add(i: Int, j: Int) = shift {k: (Int => Unit) => sync{k(i + j)}}
def mult(i: Int, j: Int) = shift {k: (Int => Unit) => sync{k(i * j)}}

 

And use it:

 

scala> def followup(r: Int) = println("Result: " + r)

followup: (r: Int)Unit

scala> reset {

     |   val r1 = add(1,2)

     |   val r2 = add(3,4)

     |   val r = mult(r1, r2)

     |   followup(r)

     | }

Result: 21

 

A disadvantage with this approach is that now our methods are only usable in the context of CPS (‘reset’). It means making it harder to do simple compositions or allow the “straight forward” use for those that want it.

 

Let’s look again at the original definitions:

 

def mult(i: Int, j: Int)(followup: Int => Unit) =
    sync{followup(i * j)}

 

It is obvious that ‘followup’ is our ‘k’ from the CPS code.

 

So how do we go from a simple method with followup argument to a CPS method? With partial application:

 

scala> mult(3,4) _

res3: ((Int) => Unit) => Unit = <function1>

 

The result of the partial application is a function that given a function (Int => Unit) will perform some computation.

 

To use it, we define some syntactic sugar:

 

  type Followup[Result] = Result => Unit
 
  class CpsSupport[Result](f: Followup[Result] => Unit) {
    def ! = shift {k: Followup[Result] => f(k) }
  }
 
  implicit def cps[Result](f: Followup[Result] => Unit) = new CpsSupport[Result](f)

 

 

Then, our methods are defined in the standard way, without CPS, and the usage is:

 

scala> reset {

     |   val r1 = add(1, 2) _!

     |

     |   val r2 = add(3, 4) _!

     |

     |   val r = mult(r1, r2) _!

     |

     |   followup(r)

     | }

 

scala> Result: 21

 

An alternative way is to put the body of ‘!’ directly in ‘cps’:

 

def cps[Result](f: Followup[Result] => Unit) = shift {k: Followup[Result] => f(k) }


 

 

 and use it as:

scala> reset {

     |   val r1 = cps(add(1, 2))

     |

     |   val r2 = cps(add(3, 4))

     |

     |   val r = cps(mult(r1, r2))

     |

     |   followup(r)

     | }

 

scala> Result: 21

 

 

(Note: The newlines between the calls to methods are required, otherwise Scala thinks that each line is a parameter to the previous one and complains that ‘!’ takes no parameters)

 

The methods ‘add’ and ‘mult’ need to explicitly use ‘sync’ this is both to guard their body (‘i * j’)  and the call to followup, which maybe any code, including code that modifies the object and therefore needs to be synced. Instead, the method ‘!’ can call sync. This can be done explicitly, or using an implicit adapter that receives ‘k’ and adapts it. I leave this as an exercise for the reader. 

 

This adapter can further be used if the followup method is of arity higher than 1. In that case, ‘k’ will be a function receiving a tuple and the adapter’s job will be to convert it to FunctionN

 

 

 

 

 

 

 

 

 

 

 

 

 

Comments

First, thanks for the time and effort put into writing the article. I am not that familiar with Scala, however since Java 5, it is much easier to write correct multi-threaded code in Java.

 

With the introduction of CountDownLatch (http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/CountDownLa...) and its counterpart CyclicBarrier (http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/CyclicBarri...) for instance, locking is simpler.

For an example ,see http://www.javamex.com/tutorials/threads/CountDownLatch.shtml which includes an example for how to start several threads start at the same time and how to wait for several threads to complete.

 

Of course Java 1.5 also introduced the ExecutorService and if you compare the code needed to be written in JDK previous versions you will see a major difference. See for example the code from the Google-json project (adapted from one of thier test cases):

 

/**
   * Source-code based on
   * http://groups.google.com/group/google-gson/browse_thread/thread/563bb51e...
   */
  public void testMultiThread() throws InterruptedException {
    final CountDownLatch startLatch = new CountDownLatch(1);
    final CountDownLatch finishedLatch = new CountDownLatch(10);
    final AtomicBoolean failed = new AtomicBoolean(false);
    ExecutorService executor = Executors.newFixedThreadPool(10);
    for (int taskCount = 0; taskCount < 10; taskCount++) {
      executor.execute(new Runnable() {
        public void run() {
          MyObject myObj = new MyObject();
          try {
            startLatch.await();
            for (int i = 0; i < 10; i++) {
              String json = gson.toJson(myObj);
            }
          } catch (Throwable t) {
            failed.set(true);
          } finally {
            finishedLatch.countDown();
          }
        }
      });
    }
    startLatch.countDown();
    finishedLatch.await();
    assertFalse(failed.get());
  }

 

The point was not to create correct code, but to create code without any locks.

 

Locks have several disadvantages:

  •  a locked thread is put to sleep for an unknown time. It means you have no control over it and hence no control over your resources. Threads are a costly resource (1M per thread) if you have a lot of concurrent processing it means a lot of threads. (e.g., wasting .5G just on threads).
  • resolving contention between many threads takes resources.
  • locks work only on a single machine (or are very costly between machines). With the solution I proposed the different calculations can be taken to remote machines.

Hi,

I am missing something, you wrote:

"Using  Scala we can easily create lock free multi-threaded code".

 

1-Either you have a shared resource that two (or more) threads are trying to write to at the same time and hence one of the threads has to block while the other is locking the resource. This is of course unless I am mistaken , irrespective of the programming language used.

 

2-Or your multi-threaded code does not have a shared resource accessed in tandem be two threads. In this case I agree, the code will be lock free since there is nothing to lock.

 

And with respect to java:

1-You can put a locked thread to sleep for a known time.

2- The default stack size per thread is not 1MB as you wrote. Rather, it depends on both your OS (e.g. Sparc or Linux or Windows) and also if its 32 or a 64 bit machine. If i remember correctly the default is 256kb for 32 bit machine and can be changed with the -Xss parameter. You can still manage to run your application without getting java.lang.StackOverflowErroreven with 256KB on most applications. Even if you have 250 Tomcat threads on a 64 bit machine (e.g. 250MB) it is only a small slice of the total memory usually allocated on 64 bit machines (e.g. 8 or 16 GB).

 

3-There are many third party modules that can be used to coordinate the execution of of different calculations on remote machines. Jmeter for instance, does that for years to just name one.

 

I am trying to advocate the use of the new threading API and not deter potential users from utilizing it just because of "contention".

 

Thanks,

In my case, I have an asynchronous function. It works in the background and I don't want to stop the calling thread waiting for it to finish.

The Java ExecutorService executes tasks asynchronically, so this is exactly what you wanted:

 

ExecutorService executor = Executors.newFixedThreadPool(5);
for (Runnable runnable : runnables) {
    executor.execute(runnable);
}

...

...
executor.shutdown();

How do I get the results? (without locking)

 

It's obvious I could write the internal mechanism using executors instead of actors. Actors are just more flexible and so are easier to write with. (internally they use some kind of executor).

 

It looks like all you see are nails. Maybe it's time to have something other than a hammer? 

see "Using Callable to Return Results From Runnables" :  http://blogs.sun.com/CoreJavaTechTips/entry/get_netbeans_6

and "Concurrency: I Can See Into Your Future" http://www.javalobby.org/forums/thread.jspa?messageID=91836328

 

"When we submit a runnable to an executor, it is much like calling start on a thread; it returns immediately, and you just simply continue processing:"

 

Runnable runnable = //...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(runnable); // returns immediately.

Callable<String> callable = //...
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(callable); // returns immediately.

"Doing so with a callable is similar, but we also get this handy dandy Future object as a return value."

 

If you give someone a hammer to all he can do is look for nails, maybe you can give me a paint brush then I can paint something for you :)

 

And how do you get results from a future without locking (hanging) the current thread???

 

The basic pattern in the article is that instead of waiting for the results, you pass the function a function of what to do with them when they are ready. So your whole chain of computation is carried into the future. Actors are used to convert a synchronous computation into an asynchronous one, to free the current thread. This bit could have been done with executors, callables, threads and what not, but I think with more boilerplate than using actors.

 

CPS is then used to bring back the traditional "look and feel" of code to be line by line instead of chain of callbacks.

 

I'm not suggesting that this paradigm is suitable for all cases of concurrency and I used in the past and will continue to use lock-based approaches. But for my particular case, the "followup" paradigm had a better fit, so I chose it.

If you're into concurrent programming in general, you should really look into Scala. You can use all Java classes, but you have also options provided by standard Scala libraries:

 

 

  •  

Thanks for the references, I am going to take a look at them.

 

In addition, I posted this link a year ago, might also be helpful:

http://ruben.savanne.be/articles/concurrency-in-erlang-scala

 

Interesting post. Enjoyed reading it. I was recently tinkering with the same topic but was using a continuation monad (scala.Responder instead of reset/shift) and Kleisli structures (provided by scalaz) to compose asynchronous functions. This supports CPS-based composition based on a syntax like asyncFn1 >=> asyncFn2 >=> asyncFn3 >=> ... Here's a simple example: https://gist.github.com/758761.