select!

Макрос futures::select запускает несколько футур одновременно и передаёт управление пользователю, как только любая из футур завершится.


#![allow(unused_variables)]
fn main() {
use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
}

Функция выше запустит обе t1 и t2 параллельно. Когда t1 или t2 закончится, соответствующий дескриптор вызовет println! и функция завершится без выполнения оставшейся задачи.

Базовый синтаксис для select: <pattern> = <expression> => <code>,, повторяемый столько раз, из скольких футур вам надо сделать select.

default => ... и complete => ...

Также select поддерживает ветки default и complete.

Ветка default выполнится, если ни одна из футур, переданных в select, не завершится. Поэтому select с веткой default всегда будет незамедлительно завершаться, так как default будет запущен, когда ещё ни одна футура не готова.

Ветка complete может быть использована для обработки случая, когда все футуры, бывшие в select, завершились и уже не могут прогрессировать. Это бывает удобно, когда select! используется в цикле.


#![allow(unused_variables)]
fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}
}

Взаимодействие с Unpin и FusedFuture

Одна вещь, на которую вы могли обратить внимание в первом примере, это то, что мы вызвали .fuse() для футур, возвращённых из двух async fn, а потом закрепили их с помощью pin_mut. Оба этих вызова важны, потому что футуры, используемые в select, должны реализовывать и типаж Unpin, и типаж FusedFuture.

Unpin важен по той причине, что футуры, используемые в select, берутся не по значению, а по изменяемой ссылке. Так как владение футурами никому не передано, незавершённые футуры могут быть снова использованы после вызова select.

Аналогично, типаж FusedFuture необходим, так как select не должен опрашивать футуры после их завершения. FusedFuture реализуется футурами, которые отслеживают, завершены ли они или нет. Это делает возможным использование select в цикле, опрашивая только те футуры, которые до сих пор не завершились. Это можно увидеть в примере выше, где a_fut или b_fut будут завершены во второй раз за цикл. Так как футура, возвращённая future::ready, реализует FusedFuture, она может сообщить select, что её не надо снова опросить.

Заметьте, что у стримов есть соответствующий типаж FusedStream. Стримы, реализующие этот типаж или имеющие обёртку, созданную .fuse(), возвращают FusedFuture из комбинаторов .next() и .try_next().


#![allow(unused_variables)]
fn main() {
use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}
}

Распараллеливание задач в цикле с select с помощью Fuse и FuturesUnordered

Одна довольно труднодоступная, но удобная функция - Fuse::terminated(), которая позволяет создавать уже прекращённые пустые футуры, которые в последствии могут быть заполнены другой футурой, которую надо запустить.

Это может быть удобно, когда есть задача, которую надо запустить в цикле в select, но которая создана вне этого цикла.

Обратите внимание на функцию .select_next_some(). Она может использоваться с select для запуска полученных из стрима тех ветвей, которые имеют значение Some(_), а не None.


#![allow(unused_variables)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
}

Когда надо одновременно запустить много копий какой-либо футуры, используйте тип FuturesUnordered. Следующий пример похож на один из тех, что был выше, но здесь мы дожидаемся завершения каждой выполненной копии run_on_new_num_fut, а не останавливаем её при создании новой. Он также отобразит значение, возвращённое run_on_new_num_fut.


#![allow(unused_variables)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

}