Background Manager #

Pragma Engine has an object called BackgroundManager that wraps up common functionality and handles tasks meant to be run in the background.

There are three main kinds of tasks that the BackgroundManager handles:

  • tasks that need to be run periodically in the background
  • tasks run in parallel in the background where we don’t need answers
  • tasks that run when a channel receives an element to process

Background tasks #

There are often situations where we need work to be spun off and run, but we don’t need to wait for it to finish and we don’t need results.

Examples include:

  • sending a request to another service without needing an answer
  • writing some data to a metrics engine

Let’s take a look at how Pragma Engine functionality can handle this type of task.

Fire and Forget #

BackgroundManager.fireAndForget() runs code in the background without blocking the active code.

fireAndForget() runs code on the coroutine scope owned by BackgroundManager. Since BackgroundManager handles launching this block, the caller doesn’t need to be a suspend function, and fireAndForget() returns immediately after launching code. The code passed in is a suspend block so it can run in a coroutine. We won’t be able to return any values from the block.

Example

This example fires off a piece of work that doesn’t block active code.

fun foo():String {
    //fire off a piece of work but don’t block
    backgroundManager.fireAndForget { doSomeWork() }
    //this returns right away without waiting for doSomeWork()
    return "task farmed out"
}

suspend fun doSomeWork() {
    someObject.longRunningWork()
    anotherObject.doSomeStuff()
}

Fire and Forget All #

BackgroundManager.fireAndForgetAll() runs a collection of tasks in the background in parallel.

This method takes a list of elements of type T and a functor that takes an argument of type T and launches one coroutine for each element in the list.

Example

In this example, foo() returns right away without blocking, and BackgroundManager launches one coroutine for each element in the request list, and calls sendRequest with the element passed in as an argument. These coroutines are executed in parallel.

fun foo(requests: List<Request>) {
    backgroundManager.fireAndForgetAll(requests, ::sendRequest)
}
    
suspend fun sendRequest(request: Request) {
    this.sendRequest(request)
}

Fire and Forget IO #

BackgroundManager.fireAndForgetIO() runs tasks that may block for long periods of time.

If we have long-running tasks that block threads (note the difference between blocking threads and suspending coroutines), we may run out of worker threads to service other coroutines on the Pragma Engine thread pool. fireAndForgetIO() is designed to handle work that may block threads by servicing them with Kotlin’s Dispatchers.IO CoroutineDispatcher. This is critical to prevent locking up Pragma Engine threads, and to allow work to continue smoothly.

Example

In this example, we use fireAndForgetIO to access the file system.

backgroundManager.fireAndForgetIO {
    while (isRunning) {
        directoryWatchService.pollEvents().forEach { filename ->
            if (filesToWatch.contains(filename)) channel.send(filename)
        }
    }
}

Run Forever #

BackgroundManager.runForever() schedules tasks to run periodically until BackgroundManager is shut down.

This function takes a functor to repeat and a delay in milliseconds between invocations of the functor. If the functor throws an exception, Pragma Engine logs the error and continues to invoke the functor on the delay. runForever() can’t return values, and are useful for polling tasks.

Example

This example is a simple function that sends a ping every 2000 milliseconds.

fun foo() {
    backgroundManager.runForever(::ping, 2000)
}

Actors #

When using channels, we need a background task running that deals with data sent to the actor. Pragma Engine has wrapped this boilerplate into two functionalities: parallelActor() and serialActor().

Parallel actor #

BackgroundManager.parallelActor() handles data sent to a channel.

Within a fireAndForget(), each element on the channel is processed using the passed in block of code on its own fireAndForget(). This results in as many concurrent processes as there are available elements.

Use case: This can be useful when updating independent states and can be faster because the transactions do not have to wait on each other.

Example

In this example, we use parallelActor() to read messages from a channel in parallel.

fun foo(channel: Channel<SomeMessage>) {
    backgroundManager.parallelActor(channel, ::readMessage)
}
fun readMessage(msg: SomeMessage) {
    // parse the message and update some stuff
}

Serial actor #

BackgroundManager.serialActor() handles data sent to a channel while preserving serial processing.

Within a fireAndForget(), each element on the channel is processed separately using the passed in block of code until completion before the next element is processed.

Use case: This can be useful when updating state is not thread-safe or it is important to preserve transaction ordering.

Example

In this example, we update the most recent message with serialActor() to guarantee that setMostRecentMessage() is called with elements in the order they’re received on the channel.

fun bar(channel: Channel<SomeMessage>) {
    backgroundManager.serialActor(channel, ::setMostRecentMessage)
}
fun setMostRecentMessage(msg: SomeMessage) {
    // keep track of the most recent message
}

Cancellations #

Kotlin cancels coroutines by throwing a CancellationException.

There are two CancellationExceptions in Kotlin: one in kotlinx.coroutines and one in kotlin.coroutines.cancellation. Both are type aliased to java.util.concurrent.CancellationException.

This applies to all blocks and functors passed into BackgroundManager, so if you have a task that needs to clean up after itself, you need to catch and handle CancellationExceptions.

If shutdown() is called on a BackgroundManager, all running and pending tasks are sent CancellationExceptions.

See the Kotlin Cancellation and exceptions documentation for more information.

Example

In this example, we call fireAndForgetIO() on a task, catch a potential CancellationException, and then clean up.

fun foo() {
    backgroundManager.fireAndForgetIO { doADatabaseThing() }
}

suspend fun doADatabaseThing() {
    try {
        database.doQuery()
    } catch (e:CancellationException) {
        database.rollbackQuery()
    }
}