select!
Макрос futures::select
запускает несколько футур
одновременно и передаёт управление пользователю, как только любая из футур завершится.
#![allow(unused)] fn main() { use futures::{ future::FutureExt, // for `.fuse()` pin_mut, select, }; async fn task_one() { /* ... */ } async fn task_two() { /* ... */ } async fn race_tasks() { let t1 = task_one().fuse(); let t2 = task_two().fuse(); pin_mut!(t1, t2); select! { () = t1 => println!("task one completed first"), () = t2 => println!("task two completed first"), } } }
Функция выше запустит обе t1
и t2
параллельно. Когда t1
или t2
закончится, соответствующий дескриптор вызовет
println!
и функция завершится без выполнения
оставшейся задачи.
Базовый синтаксис для select
: <pattern> = <expression> => <code>,
,
повторяемый столько раз, из скольких футур вам надо сделать select
.
default => ...
и complete => ...
Также select
поддерживает ветки default
и complete
.
Ветка default
выполнится, если ни одна из футур,
переданных в select
, не завершится. Поэтому
select
с веткой default
всегда будет
незамедлительно завершаться, так как default
будет
запущен, когда ещё ни одна футура не готова.
Ветка complete
может быть использована для
обработки случая, когда все футуры, бывшие в
select
, завершились и уже не могут прогрессировать. Это бывает удобно, когда select!
используется в цикле.
#![allow(unused)] fn main() { use futures::{future, select}; async fn count() { let mut a_fut = future::ready(4); let mut b_fut = future::ready(6); let mut total = 0; loop { select! { a = a_fut => total += a, b = b_fut => total += b, complete => break, default => unreachable!(), // never runs (futures are ready, then complete) }; } assert_eq!(total, 10); } }
Взаимодействие с Unpin
и FusedFuture
Одна вещь, на которую вы могли обратить внимание в первом
примере, это то, что мы вызвали .fuse()
для футур,
возвращённых из двух async fn
, а потом закрепили
их с помощью pin_mut
. Оба этих вызова важны,
потому что футуры, используемые в select
, должны
реализовывать и типаж Unpin
, и типаж
FusedFuture
.
Unpin
важен по той причине, что футуры, используемые в select
, берутся не по значению, а по изменяемой ссылке. Так как владение футурами никому не передано, незавершённые футуры могут быть снова использованы после вызова select
.
Аналогично, типаж FusedFuture
необходим, так как select
не должен опрашивать футуры после их
завершения. FusedFuture
реализуется футурами, которые отслеживают, завершены ли они или нет. Это делает
возможным использование select
в цикле, опрашивая только те футуры, которые до сих пор не завершились.
Это можно увидеть в примере выше, где a_fut
или b_fut
будут завершены во второй раз за цикл. Так
как футура, возвращённая future::ready
, реализует FusedFuture
, она может сообщить
select
, что её не надо снова опросить.
Заметьте, что у стримов есть соответствующий типаж FusedStream
. Стримы, реализующие этот типаж
или имеющие обёртку, созданную .fuse()
, возвращают FusedFuture
из комбинаторов
.next()
и .try_next()
.
#![allow(unused)] fn main() { use futures::{ stream::{Stream, StreamExt, FusedStream}, select, }; async fn add_two_streams( mut s1: impl Stream<Item = u8> + FusedStream + Unpin, mut s2: impl Stream<Item = u8> + FusedStream + Unpin, ) -> u8 { let mut total = 0; loop { let item = select! { x = s1.next() => x, x = s2.next() => x, complete => break, }; if let Some(next_num) = item { total += next_num; } } total } }
Распараллеливание задач в цикле с select
с помощью Fuse
и FuturesUnordered
Одна довольно труднодоступная, но удобная функция - Fuse::terminated()
, которая позволяет создавать уже
прекращённые пустые футуры, которые в последствии могут быть заполнены другой футурой, которую надо запустить.
Это может быть удобно, когда есть задача, которую надо запустить в цикле в select
, но которая
создана вне этого цикла.
Обратите внимание на функцию .select_next_some()
. Она может использоваться с select
для запуска полученных из стрима тех
ветвей, которые имеют значение Some(_)
, а не None
.
#![allow(unused)] fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) { /* ... */ } async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let run_on_new_num_fut = run_on_new_num(starting_num).fuse(); let get_new_num_fut = Fuse::terminated(); pin_mut!(run_on_new_num_fut, get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived -- start a new `run_on_new_num_fut`, // dropping the old one. run_on_new_num_fut.set(run_on_new_num(new_num).fuse()); }, // Run the `run_on_new_num_fut` () = run_on_new_num_fut => {}, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } }
Когда надо одновременно запустить много копий какой-либо футуры, используйте тип FuturesUnordered
.
Следующий пример похож на один из тех, что был выше, но здесь мы дожидаемся завершения каждой выполненной копии
run_on_new_num_fut
, а не останавливаем её при создании новой. Он также отобразит значение, возвращённое
run_on_new_num_fut
.
#![allow(unused)] fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, FuturesUnordered, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 } async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let mut run_on_new_num_futs = FuturesUnordered::new(); run_on_new_num_futs.push(run_on_new_num(starting_num)); let get_new_num_fut = Fuse::terminated(); pin_mut!(get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived -- start a new `run_on_new_num_fut`. run_on_new_num_futs.push(run_on_new_num(new_num)); }, // Run the `run_on_new_num_futs` and check if any have completed res = run_on_new_num_futs.select_next_some() => { println!("run_on_new_num_fut returned {:?}", res); }, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } }