Rust tokio task cancellation patterns

Update: 19/04/2024: read at the end of the post for more info.

I have been trying to pick up Rust again recently. It’s been a bit of a slow burn at the beginning but I think I’m finally starting to feel the compounding effects kicking in. Maybe it’s just my brain playing a trick on me, but I’m feeling at much more ease when writing Rust now than I was a few weeks back.

I dabbled with Rust in the past but never built anything “substantial” which is where the biggest learning usually happens because you get to face issues that don’t tend to surface on smaller-scale projects. This is not to say that building small projects is useless! On the contrary! They’re an excellent way to get your feet wet with any new language! It’s just that the small projects usually don’t highlight the lack of knowledge in some areas of language you didn’t need when building them.

One of those areas for me was concurrency. Rust provides support for async programming which lets you spawn asynchronous tasks which are then scheduled for execution across OS threads via a runtime executor. As with everything in Rust, async programming has to be memory-safe so you need to make sure the borrow checker is happy. Which is a proper challenge at times.

To make things a bit weirder in the Rust land, whilst async programming is the first class citizen, the standard library (i.e. async_std) is rather limited or at least seems like it to an untrained beginner’s eyes like my own.

Turns out people tend to use tokio crate instead which provides a wealth of features and has a really great tutorial. Most of what I know about Rust async programming I learnt by following the tokio tutorial.

One thing the tutorial doesn’t go into much detail is task cancellations, which is something I had a need for in one of my projects. Specifically, I needed to spin a bunch of worker tasks and I needed a way to cancel either any or all of them. I did a bit of research and hacking aside and came up with a few patterns I wanted to put somewhere where I could both easily access them and share them with anyone who might find them useful.

My intention in this blog post and the accompanying GitHub repo is to document these patterns and I expect this post to be more of a living document that will receive continuous updates based on the new tokio tricks I’m hoping to learn in the future. As I said, I’m a major Rust noob at the moment so these “patterns” might be completely bogus – if you find them to be so, please leave a comment at the end of the post and I’ll be happy to update it.

Select and Channels

At the heart of all of these patterns are two tokio features (assuming one is familiar with spawning the tasks, indeed):

  • channels: used for inter-task communication
  • select: used for waiting on multiple async computations (not necessaily tasks!)

At first sight they seem eerily similar to Go channels and select but the devil lies in the details. Whilst Go offers a single channel construct, the semantics of which (send/receive) are “tuned” via the optional <-/-> operators (checked by the compiler _only when explicitly written out – unfortunately this is easily forgettable and more often than not, simply omitted by many Go programmers); tokio channels are seemingly a bit more complex, but at the same time more powerful as far as memory safety and resilience of your program is concerned.

Tokio channels create two distinct objects for communicating between tasks. You can’t use a single channel for both receiving and sending. This prevents a whole class of problems I have faced many times when programming in Go such as when you accidentally omit the optional channel semantics and send data to a channel that was supposed to be receive only – Go compiler sees nothing wrong with it, but you spend hours debugging why are things not working.

Another interesting feature from a Go programmer’s point of view is that you can create a multiple-producer-single-consumer channels – referred to via the mpsc acronym – which prevents another class of problems I’ve faced in Go programs: multiple receivers modifying the same object thus causing a very sad data races which can be hard to find. In Go you can happily send a pointer down a channel to multiple consumers and make a promise to yourself that it’s either read-only or you’ll never try modifying it; the problem is, Go compiuler does not enforce these things – this is literally just some sort of convention/unwritten rule you are hoping others who use your code will follow. Having a compiler that can help you enforicing this is pretty handy and bodes well with Rust’s memory safety mantrar. Actually, the story of concurrent type safety is a bit more interesting and involves discussing Send and Sync traits, but this post is about task cancellation patterns, so I’ll leave it to you to dig into these traits.

If you do need to send many values to many consumers Tokio does have your back, worry not: tokio provides the broadcast channel.

In general, there are actually four types of channels provided by Tokio:

  • mpsc: multiple producers single consumer, described above
  • oneshot: for sending a receiving a single value; once sent, the channels is closed
  • broadcast: describe above - multiple senders, multiple consumers
  • watch: single producer, multiple consumers

As I said these are nicely documented and discussed in the tokio tutorial. It took me a bit of time to digest these concepts as I naturally tend to compare them to their Go language equivalents but they’re subtly different. But anyway, let’s move on to the actual cancellation patterns that leverage select and channels.

The following is an incomplete list of the patterns I came up with. Some of these might feel a bit contrived to the experienced Rust programmer’s eye, but they worked rather well for me, as I said I’m still a Rust n00b so I’m likely doing something wrong. Here goes!

Dropping JoinHandle DOES NOT CANCEL THE TASK

NOTE: colorfulchew correctly pointed out on Reddit using drop does not actually cancel the task, but dropping the handle when it goes out of scope does – see the example below this example. abcSilverline mentioned that I misread the official documentation that explicitly says:

A JoinHandle detaches the associated task when it is dropped, which means that there is no longer any handle to the task, and no way to join on it.

Every time you spawn a task in tokio you get back JoinHandle. You can use the join handle to await the task to complete, but you I thought you could use it to forcefully terminate the task by simply dropping it. Here’s a silly example:

use tokio::time::{self, Duration};

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // do some work
        tokio::time::sleep(Duration::from_secs(10)).await;
        println!("Task completed");
    });

    // Cancel the task after 1 second
    time::sleep(Duration::from_millis(100)).await;
    drop(handle);

    println!("Task was cancelled");
}

I also wrongly assumed I could do this:

use tokio::time::{self, Duration};

#[tokio::main]
async fn main() {
    {
        let handle = tokio::spawn(async {
            // do some work
            tokio::time::sleep(Duration::from_secs(10)).await;
            println!("Task completed");
        });

        // Cancel the task after 100ms
        time::sleep(Duration::from_millis(100)).await;
    }

    println!("Task was cancelled");
}

Alas, no, dropping the handle does NOT cancel the running task!

Aborting the task

This is the most drastic way of canceling the task allowing no room for cleanup:

use tokio::time::{self, Duration};

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // do some work
        tokio::time::sleep(Duration::from_secs(1)).await;
        println!("Task completed");
    });

    // Cancel the task after 100 milliseconds
    time::sleep(Duration::from_millis(100)).await;
    handle.abort();
    time::sleep(Duration::from_secs(2)).await;

    println!("Task was cancelled");
}

Using the oneshot channel

If you just need a one-off cancellation to be broadcast to multiple tasks, the oneshot channel works like a charm.

The oneshot channel allows a single send on the channel which can be listened to by multiple receivers. Unlike the drop this pattern lets your channels do some cleanup. Here’s an example:

use tokio::time::Duration;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    let task = tokio::spawn(async move {
        tokio::select! {
            _ = rx => {
                println!("Task is cancelling...");
            }
            _ = tokio::time::sleep(Duration::from_secs(10)) => {
                println!("Task completed normally");
            }
        }
        println!("Task is cleaning up");
    });

    tokio::time::sleep(Duration::from_millis(100)).await;

    // Send a cancellation signal
    let _ = tx.send(());

    // Wait for the tasks to finish
    // NOTE: we could do this instead:
    // let _ = tokio::join!(task);
    let _ = task.await;
}

Now, if you run this you’ll get something like this:

Task is cancelling...
Task is cleaning up

The limitation of the oneshot channel is that you can’t use it to cancel multiple tasks. It’s literally designed and optimised for one-off notifications. So beware of that!

Using the broadcast channel to cancel multiple tasks

If you want to cancel multiple tasks you can use the broadcast channel. You can have multiple producers sending to the channel as well as multiple consumers receiving from it. Each receiver sees every value sent on the channel. Handy!

Here’s a simple example how to use it to cancel multiple tasks and let them clean up:

use tokio::sync::broadcast;
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel(1);
    let mut rx2 = tx.subscribe();

    let task1 = tokio::spawn(async move {
        tokio::select! {
            _ = rx1.recv() => {
                println!("Task 1 is cancelling...");
            }
            _ = tokio::time::sleep(Duration::from_secs(10)) => {
                println!("Task 1 completed normally");
            }
        }
        println!("Task 1 is cleaning up");
    });

    let task2 = tokio::spawn(async move {
        tokio::select! {
            _ = rx2.recv() => {
                println!("Task 2 is cancelling...");
            }
            _ = tokio::time::sleep(Duration::from_secs(10)) => {
                println!("Task 2 completed normally");
            }
        }
        println!("Task 2 is cleaning up");
    });

    tokio::time::sleep(Duration::from_millis(100)).await;

    // Send a cancellation signal
    let _ = tx.send(());

    // Wait for the tasks to finish
    let _ = tokio::join!(task1, task2);
}

If you run this program you’ll get something like this:

Task 2 is cancelling...
Task 2 is cleaning up
Task 1 is cancelling...
Task 1 is cleaning up

Note that the order of the cancellation may vary in your case as the tasks might be cancelled in a different order!

The broadcast channel might be a bit of an overkill if you just want to send a cancellation signal from a single task to multiple tasks because it provides all the machinery for passing messages between multiple tasks.

This is handy if you need both the messaging passing and cancellation but if need just the cancellation there is a better way. Or maybe not necessarily better but one with less overhead: watch channel.

Using the watch to cancel multiple tasks

The watch channel is a single producer of multiple consumer channels. Again, the watch channel gives the task the opportunity the clean up after themselves. The downside is, that the consumers only see the most recent value sent on the channel – meaning if your task is launched after a value has been sent on the channel it might miss it and thus not get cancelled, so beware of this. Here’s a simple example:

use tokio::sync::watch;
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    let (tx, mut rx1) = watch::channel(false);
    let mut rx2 = tx.subscribe();

    let task1 = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = rx1.changed() => {
                    if *rx1.borrow() {
                        println!("Task 1 is cancelling...");
                        break;
                    }
                }
                _ = tokio::time::sleep(Duration::from_secs(10)) => {
                    println!("Task 1 completed normally");
                    break;
                }
            }
        }
        println!("Task 1 is cleaning up");
    });

    let task2 = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = rx2.changed() => {
                    if *rx2.borrow() {
                        println!("Task 2 is cancelling...");
                        break;
                    }
                }
                _ = tokio::time::sleep(Duration::from_secs(10)) => {
                    println!("Task 2 completed normally");
                    break;
                }
            }
        }
        println!("Task 2 is cleaning up");
    });

    tokio::time::sleep(Duration::from_millis(100)).await;

    // Send a cancellation signal
    let _ = tx.send(true);

    // Wait for the tasks to finish
    let _ = tokio::join!(task1, task2);
}

This channel was designed to watch for changes in your program. Kinda like pub/sub on a specific piece of data: the docs mention watching configuration changes as a canonical example, but you can use it just as well for cancellation.

Note how we send false on the channel when we initialized it and how we use true for sending the cancellation. Each task has to check if they received the true as they may as well receive false. We could also send some enum or String down like Foo::cancel or "cancel", but again, each task would have to check if it’s a cancellation signal and do some cleanup if it is and then exit.

Cancellation tokens

The official tokio documentation lists something called a CancellationToken in the article about graceful shutdown. This isn’t available in the tokio crate itself, but instead in the related toko_util crate.

I’ve not used it in any of my projects as I’m trying to avoid pulling it more dependencies than the ones I’m already using, but it’s another interesting option that is actually geared for cancellations.

use tokio::time::{sleep, Duration};
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() {
    // Create a CancellationToken
    let token = CancellationToken::new();

    let token1 = token.clone();
    let token2 = token.clone();

    let task1 = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = token1.cancelled() => {
                        println!("Task 1 is cancelling...");
                        break;
                }
                _ = tokio::time::sleep(Duration::from_secs(10)) => {
                    println!("Task 1 completed normally");
                    break;
                }
            }
        }
        println!("Task 1 is cleaning up");
    });

    let task2 = tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = token2.cancelled() => {
                        println!("Task 2 is cancelling...");
                        break;
                }
                _ = tokio::time::sleep(Duration::from_secs(10)) => {
                    println!("Task 2 completed normally");
                    break;
                }
            }
        }
        println!("Task 2 is cleaning up");
    });

    sleep(Duration::from_millis(100)).await;

    // Send a cancellation signal
    token.cancel();

    // Wait for the tasks to finish
    let _ = tokio::join!(task1, task2);
}

Notice how we cloned the token so we can move it to individual async tasks. One thing worth mentioning is there is also something called child_token which, to quote the official documentation:

Unlike a cloned CancellationToken, cancelling a child token does not cancel the parent token.

Conclusion

This is the list of the cancellation options I’ve compiled together over the past couple of weeks whilst learning about tokio. As I said it’s by no means a complete list and there are likely many more options at one’s disposal which I’m keen to learn about so don’t be shy and hit me in the comments. You can find all the code samples listed in this blog post on GitHub for reference. Don’t be shhy and open a PR with new patterns or fixing the existing ones!

Update: 19/04/2024 colorfulchew correctly pointed out on Reddit that using drop on the task handle does not actually cancel the task – you must use abort() to cancel the task instead. abcSilverline mentioned that I misread the official documentation that explicitly says that “when it is dropped, which means that there is no longer any handle to the task, and no way to join on it.”


See also