Итерирование и параллелизм

Подобно синхронным итераторам, существует множество различных способов итерации и обработки значений в Stream. Существуют методы комбинаторного стиля например, map, filter и fold и их братьев раннего-выхода-из-за-ошибки try_map, try_filter и try_fold.

К сожалению, цикл for не может использоваться для Stream, но для императивного стиля написания кода, могут быть использованы while let и функции next/try_next:


#![allow(unused_variables)]
fn main() {
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}
}

Однако, если мы просто обрабатываем один элемент за раз, мы потенциально оставляем возможность для параллелизма, который, в конце концов, стоит на первом месте при написании асинхронного кода. Для обработки нескольких элементов из потока одновременно, используйте методы for_each_concurrent и try_for_each_concurrent:


#![allow(unused_variables)]
fn main() {
async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}
}