Thursday, December 5, 2024
No menu items!
HomeCloud ComputingKotlin for Java developers: Concurrency with coroutines

Kotlin for Java developers: Concurrency with coroutines

The ability to perform multiple operations simultaneously is one of the most important aspects of any programming language. Orchestrating multiple paths of execution is inherently complex and there are various approaches to taming this complexity. Kotlin is a JVM language growing in popularity for its clean union of functional and object-oriented paradigms. In my previous article, I introduced Kotlin’s coroutines as a powerful feature for handling concurrency. Now we’ll take a closer look at writing concurrent programs using coroutines.

Understanding Kotlin’s coroutines

Like concurrency in general, it’s easy to understand the basics of Kotlin’s coroutines but you have to keep a firm footing and move gradually: it can get complex fast. Kotlin’s coroutines are found in the kotlinx.coroutines package, which covers a wide terrain; everything from simple blocking subroutines to sophisticated reactive pipelines.

Coroutines are an abstraction over threads, similar to virtual threads in Java. Rather than controlling threads, with coroutines you are usually managing a platform-level object that the platform manages for you. The result is the potential for better performance—sometimes radically better—compared with using traditional threads. In essence, you declare the parts of your code that are “suspendable” and the engine determines how best to orchestrate them under the hood.

Syntactically, the idea behind coroutines is to provide a way to manage concurrency with synchronous-looking code blocks. This is done by using coroutine scopes. Spawning coroutines is only allowed within a scope. The scope defines the behavior of the routines within it, and all the coroutines launched within a scope return to that scope, regardless of how they conclude (e.g., even if they error out).

Declare a blocking scope

The most basic kind of scope is a blocking scope, which is obtained using the runBlocking function. This kind of scope tells the platform to block the current thread until all the coroutines within it are finished. It’s usually used at the top level of an application, in the main function, to ensure the program doesn’t complete before its subroutines are done with their work. (But note that it isn’t necessary to use runBlocking in Android because that platform uses an event loop.)

Here’s a common way to declare a blocking scope in your code:


import kotlinx.coroutines.*

fun main() = runBlocking {
  // do concurrent work
}

This syntax is an interesting part of Kotlin. It says to execute main immediately and provides the runBlocking function (from the kotlinx.coroutines library) as the implementation. It passes in the code block defined as the argument (known as a trailing lambda). Whatever is defined in the body of the curly braces will be executed in the blocking thread scope created by runBlocking.

Run a task with the launch function

Now let’s say we want to run a task inside our blocking main scope. A common approach is to use the launch function found in kotlinx.coroutines:


import kotlinx.coroutines.*

fun main() = runBlocking {
    println("start main")
    launch {
      println("start launch 1")
      delay(1000)
      println("end launch 1")
    }
    //println("between launches (main)")
    println("end main")
}

Running this produces the following output:


start main
end main
start launch 1
end launch 1

This output shows us that the main function executes to the end, then waits while the launch block is run concurrently. It waits for a second and then completes. This is an example of the delay function, which lets us wait for a specified number of milliseconds.

Now imagine that instead of a delay, we are executing a long-running network request, disk operation, or calculation. In this case, we could use a coroutines dispatcher. Dispatchers are used to fine-tune how concurrent jobs are handled. The default dispatcher works well to start.

Dispatchers and delays

Let’s expand our example to spawn two different jobs:


fun main() = runBlocking {
    println("start main")
    launch {
      println("start launch 1")
      delay(1000)
      println("end launch 1")
    }
    launch {
      println("start launch 2")
      delay(500)
      println("end launch 2")
    }
    println("end main")
}

Now we have two tasks launched, and running the above code produces output like so:


start main
end main
start launch 1
start launch 2
end launch 2
end launch 1

As you would expect, the faster job (with a delay of 500 milliseconds) completes before the slower one (1,000 milliseconds). Both complete after the main function is done.

Now let’s extract our functionality to a function. Say we want to have a function that accepts the milliseconds to wait (in real life this could be an API endpoint to retrieve) and we’ll call it twice with two different parameters (500 and 1000) to reproduce our previous example in a more flexible format. Using coroutines, the simplest possible way to do this is:


import kotlinx.coroutines.*

suspend fun launchTask(delayMillis: Long) {
    println("START task $delayMillis")
    delay(delayMillis)
    println("END launchTask $delayMillis")
}

fun main() = runBlocking {
    println("start main")

    launch {
      launchTask(1000)
    }
    launch {
      launchTask(500)
    }

    println("end main")
}

Running the above code produces the same output as the last example, but now we have a reusable launchTask function. Notice launchTask is prefixed with the suspend keyword. Without that, the engine will not understand that the function supports being “suspended”—meaning it is paused to allow other work to occur—and the compiler will reject the delay.

Notice also that we had to use two launch blocks. If we used one with both launchTasks calls, the calls would happen sequentially.

Contexts and cancelations

Now let’s incrementally increase the sophistication of our concurrent logic. If we wanted to support the ability to cancel the two tasks, we could tie them together into a context, and use the cancel() method:


import kotlinx.coroutines.*

suspend fun launchTask(delayMillis: Long) {
    println("START task $delayMillis")
    delay(delayMillis)
    println("END launchTask $delayMillis")
}

fun main() = runBlocking {
    println("start main")

    val scope = CoroutineScope(Dispatchers.Default)

    scope.launch {
        launchTask(10000)
    }
    scope.launch {
        launchTask(500)
    }

    // Cancel all coroutines in the scope after 2 seconds
    delay(2000)
    scope.cancel()

    println("end main")
}

Here we explicitly create a CoroutineScope and use it to launch our two suspended function calls, again using the default dispatcher. With the handle to the scope, we can start our jobs and then cancel them with scope.cancel(). Notice that we have two tasks, one with a delay of 10,000 milliseconds. Because we cancel after 2,000 milliseconds, we get the following output:


start main
START task 500
START task 10000
END launchTask 500
end main

So, the 10,000-millisecond task was started but never completed. Instead, it was canceled along with its enclosing scope.

For another degree of sophistication, we can add a withTimeout block:


fun main() = runBlocking {
    println("start main")

    withTimeout(5000) {
        launch {
            launchTask(10000)
        }
        launch {
            launchTask(500)
        }
    }

    println("end main")
}

This block behaves similarly to the previous example in that it cuts short the 10,000-millisecond job. But in this case an exception will be thrown. Here’s how to handle the exception gracefully:


try {
  withTimeout(5000) {
    launch {
      launchTask(10000)
    }
    launch {
      launchTask(500)
    }
  }
} catch (e: TimeoutCancellationException) {
  println("Timeout occurred: ${e.message}")
}

We’ll get the following clean output:


start main
START task 10000
START task 500
END launchTask 500
Timeout occurred: Timed out waiting for 5000 ms

Now imagine we have our two jobs and one of them will do network calls, which is considered an IO-bound operation. We can pass in a specific dispatcher to use:


launch(Dispatchers.IO) {
    launchTask(10000)
}

We can also cancel jobs individually:


val scope = CoroutineScope(Dispatchers.Default)

    scope.launch {
        launchTask(10000)
    }
    val job = scope.launch {
        launchTask(500)
    }

    job.cancel()   // cancel the specific job

    delay(2000)
    scope.cancel()

We’d get the following output:


start main
START task 10000
end main

This works because the delay() function can be canceled. In more complex scenarios, you’d have to implement the cancelation support yourself. A the Kotlin documentation notes, cancellation is cooperative, so you have to ensure your code is cancelable. This can be done using the isActive property:


val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (isActive) { // cancellable computation loop
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}

When a cancel call is received, the isActive property resolves to false and the while loop allows the job to exit.

Communication with channels

Coroutines support channels as a clean way to pass data between running coroutines. Here’s an example:


import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel

fun main() = runBlocking {
  val channel = Channel() // A communication channel for Ints

  val producer = launch {
    repeat(5) { i ->
      delay(1000) 
      channel.send(i) 
    }
  }

  val consumer = launch {
    repeat(5) {
      val message = channel.receive() 
      println("Received: $message")
    }
  }
}

This allows the producer job to send messages over the channel object and the consumer to listen for them, printing them out as they are received. This is a very simple mechanism for sharing data between concurrent contexts.

Reactive programming with flows

Now let’s ratchet up the sophistication another notch using flows. These are functional event streams that give you a kind of reactive programming framework. Here’s a simple example:


import kotlinx.coroutines.*
import kotlin.random.*
import kotlinx.coroutines.flow.*

fun randomNumbers(count: Int): Flow = flow {
    for (i in 1..count) {
        emit(Random.nextInt()) // Emit a random integer
        delay(500) // Simulate some work
    }
}

fun main() = runBlocking {
    randomNumbers(5)
        .collect { value -> println("Received: $value") }
}

This code creates a function called randomNumbers that returns a flow of Ints. We call it and its body uses the emit() function to return its value. This is like a stream in Java, and the .collect() call is the termination. This lets us create composable, reactive pipelines out of flows. This programming model yields an immense amount of flexibility and power.

For example, if we wanted to add another step in the stream by doubling the numbers, we could create another functional operator:


fun doubledNumbers(numbers: Flow): Flow = numbers.map { it * 2 }

and add it to our pipeline:


fun main() = runBlocking {
    val randomFlow = randomNumbers(5)
    val doubledFlow = doubledNumbers(randomFlow) 
    evenOddFlow.collect { value -> println(value)} 
}

Conclusion

This has been a quick cruise through some of the most interesting parts of Kotlin’s concurrency model using coroutines. The foundation is easy to understand and you can layer many more complex and higher-level capabilities on top of it. It’s notable that Kotlin only includes simple concurrent primitives in the language itself, then uses mainly functions from kotlinx.coroutines and its subpackages to deliver the remaining functionality. This keeps things more flexible and amenable to elaboration by applications and library code.

Overall, Kotlin’s concurrency support is impressive and well thought out. It’s not hard to understand why the language is a popular alternative to Java.

Python to C: What’s new in Cython 3.1 | InfoWorldRead More

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments