Итерирование и параллелизм
Подобно синхронным итераторам, существует множество различных способов итерации
и обработки значений в Stream
. Существуют методы комбинаторного стиля
например, map
, filter
и fold
и их братьев раннего-выхода-из-за-ошибки
try_map
, try_filter
и try_fold
.
К сожалению, цикл for
не может использоваться для Stream
, но для
императивного стиля написания кода, могут быть использованы while let
и функции next
/try_next
:
#![allow(unused)] fn main() { async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 { use futures::stream::StreamExt; // for `next` let mut sum = 0; while let Some(item) = stream.next().await { sum += item; } sum } async fn sum_with_try_next( mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>, ) -> Result<i32, io::Error> { use futures::stream::TryStreamExt; // for `try_next` let mut sum = 0; while let Some(item) = stream.try_next().await? { sum += item; } Ok(sum) } }
Однако, если мы просто обрабатываем один элемент за раз, мы потенциально
оставляем возможность для параллелизма, который, в конце концов, стоит на первом месте при написании асинхронного кода. Для обработки нескольких элементов из потока
одновременно, используйте методы for_each_concurrent
и try_for_each_concurrent
:
#![allow(unused)] fn main() { async fn jump_around( mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>, ) -> Result<(), io::Error> { use futures::stream::TryStreamExt; // for `try_for_each_concurrent` const MAX_CONCURRENT_JUMPERS: usize = 100; stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move { jump_n_times(num).await?; report_n_jumps(num).await?; Ok(()) }).await?; Ok(()) } }