When flows split

16 Dec 2020 - Vasco Veloso

When flows split

An introduction to concurrent programming

A computer program is a nicely ordered flow of instructions, executed one after the other from start to finish… Except when it is not. We often wish to split this flow. Let’s see why and when to do it.

So, what’s concurrent programming?

The traditional way of writing programs is to express them sequentially. When one input is received, an output is produced by executing a number of tasks in order. Then the program becomes ready to process the next input, over and over again.

Figure 1 – sequential computing Figure 1 – sequential computing

This way of working matches how machines with a single processing core work. A single processor can only do one thing at a time, so this is the most straightforward method to program on it.

There are drawbacks, however. If any of the tasks needs to wait for something, say, a peripheral or the network, then the processor will remain idle during this waiting period. Processing of further inputs will be delayed, thus reducing the overall system effectiveness.

Ideally, processor time could be spent advancing one task for the next input while the processing of the previous input is waiting. This way a shared resource – the processor – could be used efficiently by different computations. To the human observer, both computations would seem to be performed simultaneously, while in reality they were being interleaved.

Figure 2 – concurrent computing Figure 2 – concurrent computing

We just defined concurrent computing while barely noticing it. Any two computations are concurrent whenever they share a common resource. They don’t occur at the same exact moment in time, but their lifespans overlap. Concurrent programming is the set of programming techniques that allow us to write programs that work in this manner.

The moment in time when computations happen is a subtle but important detail that allows us to distinguish between concurrent and parallel computing. With parallel computing, computations don’t always share resources and overlap at one or more moments in time.

Figure 3 – parallel computing Figure 3 – parallel computing

It follows that we need to employ different physical processors in order to actually run some parallel computing. Otherwise, such computations would not be parallel but concurrent. Multi-processor computers and distributed systems are examples of computational hardware that is capable of executing parallel computations. Parallel programming is thus a specialization of concurrent programming, using similar techniques, focused on ensuring that computations occur at the same physical time.

Which operations fit a concurrent model best?

Going back to our task that needs to wait for a resource, we concluded then that processing time was not being used efficiently. It is therefore logical that operations that need to wait for something else are prime candidates to be executed concurrently. It does not make sense to run them exclusively in parallel because resources will be wasted anyway whilst barely improving throughput.

Tasks that don’t wait are perfect candidates for parallelism. Think about data processing and mostly calculations. If it doesn’t need to wait nor share resources, then it will benefit the most from running in parallel.

For the remainder of this article, we will focus on concurrency instead of parallelism. The most common challenges fall within this category. This does not mean that nothing of what will be said applies to parallel computing; most of it also applies, however one must choose the correct supporting framework.

Concurrent programming is hard

Concurrency is all about resource sharing. The most evident – and problematic – kind of sharing is writing to a shared data location. A classic example is the bank account deposit problem. If two tasks want to deposit one euro in the same bank account at the same time, it is possible for money to be lost depending on the way how the independent deposit steps are ordered. This is a problem known as a race condition, whereby the outcome is dependent on which concurrent operation finishes first.

Other well-known problems are starvation and deadlocks, which can occur when there are multiple tasks sharing a finite set of resources. Resources may be claimed by all tasks in such a way that each task still lacks resources to continue: no task makes any progress, which is a deadlock. If only a subset of tasks is blocked from continuing, we say that these tasks are being starved. The dining philosophers problem [2] is a good example of a system study where both problems may happen, depending on the solution chosen.

All this goes to show that concurrent systems possess non-deterministic characteristics, i.e., the time and order of operations is not always the same and depends on external conditions. Think about load on a web shop during a sale. The timing of operations is widely different during such peak times. Some defects (like the deposit problem) may cause trouble during these periods and remain harmless the rest of the time. These can be very hard to detect, fix, and test for.

Humans have a hard time wrapping their heads around parallel flows. That being said, [1] helps with further understanding of these basic problems related to concurrency.

Implementation styles

The basic premise of concurrency is the interleaving of tasks (see figure 2). In order to accomplish this on only one computer, some foundation must exist through which such interleaving becomes possible. A construct called a thread was thus developed.

A thread is the smallest unit of execution that can be managed by a hardware platform or operating system. From a physical point of view, one hardware thread corresponds to one CPU core. At the operating system level, threads (also known as software threads) are managed by a scheduler, which contains a mechanism that executes several threads alternatively on the same hardware thread.

This is how we can run a very high number of programs on the same computer concurrently: each program correlates to at least one thread. Then these threads share a reduced number of hardware threads, taking their turns to move ahead in doing their jobs. If this mechanism for alternating between threads did not exist, we would only be able to run concurrently as many programs as the number of CPUs in the computer.

The alternation mechanism can be pre-emptive or cooperative. The former means that the scheduler is free to decide for how long it will allow a thread to run on a hardware thread, whereas the latter means that threads themselves decide when they are ready to yield CPU time. (Can you see how cooperative systems can be led into starvation?) The difference between pre-emption and cooperation is key when reasoning about concurrency.

Some platforms already support fibers, which can be seen as software threads that are scheduled cooperatively on the operating system threads. A second level of scheduling, or a higher level of abstraction, so to speak. These came into existence because they need less memory to save their state and execution takes less time to switch from one to the next, comparatively to threads. Upcoming versions of Java will support fibers.

Kotlin is a language for the JVM which brings coroutines to the equation. Coroutines are a language construct which allows us to build lightweight concurrent tasks running cooperatively, much like fibers.

Concurrency in Kotlin

Of course, now one question begs to be asked: what are coroutines?

We are all familiarised with the concept of a routine, also called a function or a method. Subroutines mean pretty much the same in our everyday work. But coroutines? They are not the same thing, but they are a deceptively simple concept. Coroutines are routines that cooperate with their execution environment and with each other.

This cooperation takes place in the sense that one coroutine can allow the control flow to be transferred to another coroutine without ending the first coroutine. Even potentially switching between execution contexts. Let it sink for a bit. This unusual definition means that one call from a coroutine to another may cause the original control flow to be suspended, only to resume later, maybe on a different thread.

Suspension in coroutines bears some similarities to suspension in threads. A thread becomes frozen in time and space once it is suspended. Then the OS scheduler selects another thread to unfreeze and resume execution, then suspends it, selects another, and so on until our first thread is selected to resume execution. At this point, it continues as if nothing had happened. The same concept applies to coroutines.

The big difference lies in the mechanics for suspending and resuming.

First of all, suspension in coroutines can only happen when another coroutine is being called, whereas threads can be suspended at any time. This is why we said earlier that a coroutine could “allow” the transfer of the control flow: a coroutine can never suspend if it does not call other coroutines.

Unlike traditional thread-based concurrency, there is no one-to-one relationship between coroutines and threads. So, the thread that was executing a coroutine that got suspended can move on to execute another coroutine. Coroutines are one level of abstraction higher than threads.

All this also means that calls to a coroutine will not necessarily restart its execution, as execution may continue from somewhere in its middle. It all depends on the state of the execution flow. That is right. Coroutines are stateful!

Writing coroutines

The Kotlin language itself supports coroutines, so the compiler and the runtime do all the work necessary to preserve and pass along the state.

public class App { 
  private static final Logger LOG = LoggerFactory.getLogger(App.class); 
  public static void main(String[] args) 
    throws ExecutionException, InterruptedException { 
      CompletableFuture.allOf( 
          CompletableFuture.runAsync(() -> LOG.info("I'm a teapot.")), 
          CompletableFuture.runAsync(() -> LOG.info("I'm a coffee pot.")) 
        ).get(); 
      LOG.info("No more pots."); 
  } 
} 

listing #1

fun main() { 
  val log = getLogger("main") 
  runBlocking(Dispatchers.Default) { 
    launch { 
      log.info("I'm a teapot.") 
    } 
    launch { 
      log.info("I'm a coffee pot.") 
    } 
  } 
  log.info("No more pots.") 
} 

listing #2

Compare the Java code in snippet 1 with the Kotlin code in snippet 2. Java got better with CompletableFuture after version 8, but Kotlin is a bit more elegant, right? Coroutines were designed to streamline the expression of concurrency in our code.

As we can see when running them, the results of both code snippets are equivalent. Both the Java lambdas and the Kotlin coroutines ran concurrently and in different threads. Let us see how we can work with Kotlin’s coroutines in practice.

We define a coroutine by using the suspend keyword in its declaration:

suspend fun doSomething(): Unit

This keyword allows the compiler to handle such functions differently. They now need to abide by the rules of coroutines. Rule number one: a coroutine may only be called from within a coroutine scope. Or in other words, from another coroutine.

It may seem like an impossible task. After all, a Kotlin program starts at the main() method, which is a regular subroutine, right? It turns out that there are constructs that allow us to create an initial coroutine scope.

runBlocking() comes to the rescue. It will take a lambda argument and run it within a coroutine. It will also not exit until the coroutine has completed. This is an essential detail because it is possible to schedule the execution of coroutines in different threads. As such, it is necessary to ensure that the main application thread does not exit until our coroutines have finished; otherwise, the application would terminate too soon.

Making coroutines concurrent

What about concurrency, then? Earlier, we mentioned concurrency and that coroutines make it easier to write concurrent code. We must keep in mind that just calling coroutines does not make them concurrent. Even though it is true that each call site of a coroutine is a suspension point, the execution flow is still sequential unless stated otherwise. Therefore, we must be explicit and say that we want a given coroutine to run concurrently. We use launch() to do so. Like runBlocking(), it will run a lambda function in a coroutine, but it will not block until its completion. Instead, it launches a child coroutine. The coroutine that called launch() will not complete until all its children do.

All of this becomes clearer if we look back at code snippets 1 and 2. In Java, we had to list explicitly all the futures we had created in order to wait for their results. The Kotlin version with coroutines is different. It automatically manages all calls made to concurrent coroutines within the runBlocking() scope.

Coroutine invocations using launch() return an instance of an object implementing Job. This interface represents the running state of a coroutine in Kotlin, much like Future represents the running state of an asynchronous task in Java. It is possible to join Jobs just like Futures. However, Job does not allow the caller to consume values returned by a coroutine. Fire-and-forget coroutines have their uses, but coroutines only achieve their full potential as functions when they can return values to their callers.

There is another interface that extends from Job, called Deferred. This interface contains the await() method, which we use to retrieve values returned by coroutines. We obtain instances of Deferred when async() is used instead of launch().

Now we can start coroutines concurrently and work with the values they return. We demonstrate this in code snippet 3.

suspend fun one(): Int = 
  withContext(Dispatchers.Default) { 
    // pretend this is computationally intensive 
    42 
  } 
 
suspend fun two(): Int = 
  withContext(Dispatchers.IO) { 
    // pretend this is an I/O operation 
    1 
  } 
 
fun main() = runBlocking { 
 
  val callOne = async { one() } 
  val callTwo = async { two() } 
 
  println(callOne.await() + callTwo.await()) 
 
} 

listing #3

Coroutines still need threads

The mechanism that supports coroutines is not magic. Remember that just like any other code, coroutines need threads to run on: just because we are not creating them ourselves, that does not mean that they do not exist.

Entities known as dispatchers manage the threads allocated to the execution of coroutines. They keep track of which coroutines are running, together with their states, and assign coroutines to threads as necessary. Just like thread pools dedicated to coroutines.

Kotlin supports several built-in dispatchers, each with its purpose. There is a dispatcher meant for I/O tasks, another to ensure that coroutines run on a thread associated with the user interface, a pre-built thread pool, among others. One should consult the Kotlin documentation to obtain more details about them since some are platform-specific.

It is possible to assign coroutines to specific dispatchers, and it is often recommended to do so. We can specify the dispatcher as an argument to the launch(), async(), and withContext() functions. Once a dispatcher is defined, by default or explicitly, it will be used by subsequent launch() and async() calls unless these specify a different dispatcher.

Code snippet 3 demonstrates usage of withContext(). In this example, each coroutine ran in a different dispatcher. The caller code got suspended in each invocation of await() until the respective coroutine finished.

Looking back at code snippet 2, we see that runBlocking() determined the dispatcher to use. In the example, it was necessary to define a dispatcher explicitly in order to ensure that the outcome would be as displayed: each coroutine running in a thread other than the main thread. If left undefined, the dispatcher used may be the Unconfined dispatcher: each coroutine will run in the calling thread until they suspend. Because the coroutines in the example never suspend – the logging operation is blocking – then only the main thread would be used.

Regardless of the function utilised to change the dispatcher, the chosen dispatcher will remain in scope and will be used for new coroutine invocations until it is changed again later.

Dispatcher management can become as complicated as thread pool management. A useful piece of advice is to use withContext() to specify the desired dispatcher. Of course, that would be overkill to do in every coroutine. So, we should use our best judgment and always remember to ask ourselves if the coroutine we are writing can run in any dispatcher or should be constrained to a specific dispatcher.

We may create our dispatchers, for example, when we want to manage the resources used by a set of coroutines. Or when these need to be segregated from other coroutines due to their runtime characteristics. Usually, this is done by creating an ExecutorService. The dispatcher is then obtained through the asCoroutineDispatcher() Kotlin extension method. No further management on our part is required, except that we must remember to shut down the executor. See code snippet number 4 for an example.

fun main() { 
  val executor = createExecutor() 
  try { 
    runBlocking(executor.asCoroutineDispatcher()) { 
      getLogger("sample").info("I'm running in a custom dispatcher.") 
    } 
  } finally { 
    executor.shutdown() 
  } 
} 
 
fun createExecutor(): ExecutorService = 
  Executors.newSingleThreadExecutor { 
    val thread = Thread(it) 
    thread.name = "custom-single-thread" 
    thread 
  } 

listing #4

Error management

Error management is an essential part of programming. It is relatively simple to reason about error flows in sequential code. However, it can be challenging to foresee all possible error flows once we start to use concurrent patterns.

For example, if we launch three concurrent tasks for one computation, and one of them fails, we need to ensure that all of them have completed before failing the entire computation. Some or all of the others may fail too, even at different moments in time. We must also handle all these possible failures. We even want to cancel tasks that have not failed instead of waiting for them to finish, since we already know that the computation as a whole will fail. All this management can add complexity that is unnecessary to the original purpose of the computation.

Structured concurrency is a paradigm whose purpose is to simplify the coordination of concurrent execution flows by enclosing them into execution scopes. These scopes are then handled automatically, terminating only when all enclosed concurrent execution flows reach their completion. In case an error occurs in one of them, then all others are cancelled.

fun main() = runBlocking { 
 
  suspend fun one(): Int { 
    println("Answering the most important question in the universe...") 
    delay(1_000) 
    println("... done!") 
    return 42 
  } 
 
  suspend fun two(): Int { 
    println("Finding the question...") 
    throw ArithmeticException() 
  } 
 
  try { 
    coroutineScope { 
 
      val first = async { one() } 
      val second = async { two() } 
 
      println(first.await() + second.await()) 
 
    } 
  } catch (e: Exception) { 
    println("Coroutines have thrown ${e::class.java}") 
  } 

} 

listing #5

We want to simulate two non-blocking calculations in code snippet 5. Unfortunately, in this simulation, one of them fails with an exception. The runtime library cancels both coroutines after the failure. Even the coroutine that did not fail is not allowed to run to its completion. The exception is propagated to the code that called await(), so it can handle the error appropriately.

Not only does this make it easier to coordinate the execution of separate concurrent flows, but it also eases the cognitive burden of managing cancellations and failure cases explicitly. Without a structured approach, the overhead introduced by error and cancellation handling leads to quite a bit of boilerplate code that distracts us from the real purpose of the computation. Structured concurrency eliminates most of this added complexity.

Is this all?

We do not intend to present an exhaustive description of coroutines and concurrency in this article. Much remains to be said about them. Instead, we hope to have shown that Kotlin has allowed concurrent code to become more readable by offering first-class support for coroutines. Form no longer obscures the intent. Happy concurrent coding!

Additional reading

[1] https://web.mit.edu/6.005/www/fa14/classes/17-concurrency/

[2] https://en.wikipedia.org/wiki/Dining_philosophers_problem

[3] https://docs.oracle.com/javase/tutorial/essential/concurrency/

[4] Alst, Paulien van (2020), Kotlin - Hype of een nieuwe speler op de markt?, Java Magazine, 01 2020, pp 22-24

[5] Subramaniam, Venkat (2019), Programming Kotlin, Pragmatic Bookshelf

[6] Kotlin documentation on coroutines, https://kotlinlang.org/docs/reference/coroutines-overview.html