Начало работы

translated

Добро пожаловать в книгу "Асинхронное программирование в Rust"! Если вы собираетесь начать писать асинхронный код на Rust, вы находитесь в правильном месте. Строите ли вы веб-сервер, базу данных или операционную систему, эта книга покажет вам как использовать инструменты асинхронного программирования, чтобы получить максимальную отдачу от вашего оборудования.

Что охватывает эта книга?

Эта книга призвана стать исчерпывающим, современным руководством по использованию асинхронных языковых возможностей и библиотек Rust, подходящим как новичкам, так и опытным разработчикам.

  • Ранние главы содержат введение в асинхронное программирование в целом, а также его особенности в Rust.

  • В средних главах обсуждаются ключевые утилиты и инструменты управления потоком, которые вы можете использовать, когда пишете асинхронный код. Также здесь описаны лучшие практики структурирования библиотек и приложений для получения максимальной производительности и повторного использования кода.

  • Последняя глава книги покрывает асинхронную экосистему и предоставляет ряд примеров того, как можно выполнить общие задачи.

Итак, давайте исследуем захватывающий мир асинхронного программирования в Rust!

Для чего нужна асинхронность?

Все мы любим то, что Rust позволяет нам писать быстрые и безопасные приложения. Но для чего писать асинхронный код?

Асинхронный код позволяет нам запускать несколько задач параллельно в одном потоке ОС. Если вы в обычном приложении хотите одновременно загрузить две разных web-страницы, вы должны разделить работу между двумя разным потоками, как тут:

fn get_two_sites() {
    // Spawn two threads to do work.
    let thread_one = thread::spawn(|| download("https://www.foo.com"));
    let thread_two = thread::spawn(|| download("https://www.bar.com"));

    // Wait for both threads to complete.
    thread_one.join().expect("thread one panicked");
    thread_two.join().expect("thread two panicked");
}

Для многих приложений это замечательно работает - в конце концов, потоки были разработаны именно для этого: одновременно запускать несколько разных задач. Однако, они имеют некоторые ограничения. В процессе переключения между разными потоками и обменом данными между ними возникает много накладных расходов. Даже поток, который сидит и ничего не делает, использует ценные системные ресурсы. Асинхронный код предназначен для устранения этих проблем. Мы можем переписать функции выше используя нотацию async/.await, которая позволяет нам запустить несколько задач одновременно, не создавая нескольких потоков:

async fn get_two_sites_async() {
    // Create two different "futures" which, when run to completion,
    // will asynchronously download the webpages.
    let future_one = download_async("https://www.foo.com");
    let future_two = download_async("https://www.bar.com");

    // Run both futures to completion at the same time.
    join!(future_one, future_two);
}

В целом, асинхронные приложения могут быть намного быстрее и использовать меньше ресурсов, чем соответствующая многопоточная реализация. Однако, есть и обратная сторона. Потоки изначально поддерживаются операционной системой и их использование не требует какой-либо специальной модели программирования - любая функция может создать поток и вызов функции, использующей потоки, обычно так же прост, как вызов обычной функции. Тем не менее, асинхронные функции требуют специальной поддержки со стороны языка или библиотек. В Rust, async fn создаёт асинхронную функцию, которая возвращает Future. Для выполнения тела функции, возвращённая Future должна быть завершена.

Важно помнить, что традиционные приложения с потоками могут быть вполне эффективными и предсказуемость Rust и небольшой объём используемой памяти могут значить, что вы можете далеко продвинуться и без использования async. Повышенная сложность асинхронной модели программирования не всегда стоит этого и важно понимать, когда ваше приложение будет лучше работать с использованием простой поточной модели.

Состояние асинхронности в Rust

Асинхронная экосистема Rust претерпела большие изменения с течением времени, поэтому может быть трудно понять, какие инструменты использовать, в какие библиотеки инвестировать, или какую документацию читать. Однако типаж Future внутри стандартной библиотеки и async/await в языке были недавно стабилизированы. Таким образом, экосистема в целом находится в процессе миграции к недавно стабилизированному API, после чего точка оттока будет значительно уменьшена.

Тем не менее, сейчас экосистема всё ещё находится в стадии активной разработки и асинхронный опыт в Rust не отполирован. Многие библиотеки до сих пор используют пакет futures версии 0.1, а это значит, что для взаимодействия с ними разработчикам часто требуется функциональность compat из пакета futures версии 0.3. Синтаксис async/await до сих пор достаточно нов. Важные расширения синтаксиса, такие как async fn для методов типажей, до сих пор не реализованы, и текущие сообщения компилятора об ошибках могут быть сложны для восприятия.

Это говорит о том, что Rust на пути к более эффективной и эргономичной поддержке асинхронного программирования и если вы не боитесь изменений, наслаждайтесь погружением в мир асинхронного программирования в Rust!

Пример async/.await

async/.await - инструменты для написания асинхронного кода встроенные в Rust, внешне похожие на синхронный код. Ключевое слово async превращает исполняемый блок программы в конечный автомат, который реализует типаж Future. В синхронном методе вызов функции блокирует весь поток, в асинхронном же вызов через Future вернёт контроль над потоком, позволяя работать другим Future.

Добавим некоторые зависимости в файл Cargo.toml:

[dependencies]
futures = "0.3"

При реализации асинхронной функции можно использовать такой синтаксис async fn:


#![allow(unused)]
fn main() {
async fn do_something() { /* ... */ }
}

async fn возвращает значение, которые является Future. Что бы ни произошло, Future должна быть запущена в исполнителе.

// `block_on` blocks the current thread until the provided future has run to
// completion. Other executors provide more complex behavior, like scheduling
// multiple futures onto the same thread.
use futures::executor::block_on;

async fn hello_world() {
    println!("hello, world!");
}

fn main() {
    let future = hello_world(); // Nothing is printed
    block_on(future); // `future` is run and "hello, world!" is printed
}

Внутри async fn можно использовать .await для ожидания завершения другой реализации типажа Future, например, полученной из другой async fn). В отличие от block_on, .await не блокирует текущий поток, а асинхронно ожидает завершения футуры, позволяя другим задачам в потоке выполняться, пока эта футура не может быть исполнена.

Например, представим что у нас есть три асинхронные функции async fn: learn_song, sing_song и dance:

async fn learn_song() -> Song { ... }
async fn sing_song(song: Song) { ... }
async fn dance() { ... }

Одним из способов выполнения функций «разучить песню», «спеть» и «станцевать» будет блокировка потока исполнения на каждой из них индивидуально:

fn main() {
    let song = block_on(learn_song());
    block_on(sing_song(song));
    block_on(dance());
}

В этом случае мы не достигаем наилучшей производительности - в один момент времени мы делаем только одно дело! Конечно, мы должны выучить песню до того, как петь её, но мы можем танцевать одновременно с разучиванием песни и пением. Чтобы провернуть такой вариант, создадим две отдельные функции с async fn, которые могут запуститься параллельно:

async fn learn_and_sing() {
    // Wait until the song has been learned before singing it.
    // We use `.await` here rather than `block_on` to prevent blocking the
    // thread, which makes it possible to `dance` at the same time.
    let song = learn_song().await;
    sing_song(song).await;
}

async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();

    // `join!` is like `.await` but can wait for multiple futures concurrently.
    // If we're temporarily blocked in the `learn_and_sing` future, the `dance`
    // future will take over the current thread. If `dance` becomes blocked,
    // `learn_and_sing` can take back over. If both futures are blocked, then
    // `async_main` is blocked and will yield to the executor.
    futures::join!(f1, f2);
}

fn main() {
    block_on(async_main());
}

В этом примере, разучивание песни должно быть завершено до пения, но разучивание и пение могут завершиться одновременно с танцем. Если бы мы использовали block_on(learn_song() ) вместо learn_song().await внутри learn_and_sing, поток не смог бы делать ничего другого, пока работает learn_song. Из-за этого мы одновременно с этим не можем танцевать. С помощью ожидания .await футурыlearn_song, мы разрешаем другим задачам захватить текущий поток исполнения, пока learn_song заблокирована. Это даёт возможность запускать нескольких футур в одном потоке параллельно.

Применение: HTTP сервер

Давайте используем async/.await для создания echo-сервера!

Для начала, запустите rustup update stable чтобы быть уверенным, что используете стабильную версию Rust - 1.39 или более новую. Когда вы закончите это, создайте новый проект с помощь cargo new async-await-echo и откройте созданную директорию async-await-echo.

Добавим некоторые зависимости в файл Cargo.toml:

{{#include ../../examples/01_05_http_server/Cargo.toml:9:18}}

Теперь, когда у нас есть свои зависимости, давайте начнём писать код. Вот список зависимостей, которые необходимо добавить:


#![allow(unused)]
fn main() {
{{#include ../../examples/01_05_http_server/src/lib.rs:imports}}
}

Как только закончим с импортами, мы можем собрать вместе весь шаблонный код, который позволит обрабатывать запросы:


#![allow(unused)]
fn main() {
{{#include ../../examples/01_05_http_server/src/lib.rs:boilerplate}}
}

Если вы сейчас запустите cargo run, в консоли вы увидите сообщение "Listening on http://127.0.0.1:3000". Если вы откроете URL в вашем любимом браузере, вы увидите как в нём отобразится "hello, world!". Поздравляем! Вы только что написали свой первый асинхронный web-сервер на Rust.

Вы также можете посмотреть сам запрос, который содержит такую информацию, как URI, версию HTTP, заголовки и другие метаданные. Например, мы можем вывести URI запроса следующим образом:


#![allow(unused)]
fn main() {
println!("Got request at {:?}", req.uri());
}

Вы могли заметить, что мы до сих пор не делали ничего асинхронного для обработки запроса - мы только незамедлительно ответили на него, мы не пользуемся гибкостью, которую нам даёт async fn. Вместо этого, мы только возвращаем статическое сообщение. Давайте попробуем проксировать пользовательский запрос на другой web-сайт используя HTTP-клиент Hyper'а.

Мы начнём с парсинга URL, который мы хотим запросить:


#![allow(unused)]
fn main() {
{{#include ../../examples/01_05_http_server/src/lib.rs:parse_url}}
}

Затем мы создадим новый hyper::Client и используем его для создания GET запроса, который вернём пользователю ответ:


#![allow(unused)]
fn main() {
{{#include ../../examples/01_05_http_server/src/lib.rs:get_request}}
}

Client::get возвращает hyper::client::FutureResponse, который реализует Future<Output = Result<Response, Error>> (или Future<Item = Response, Error = Error> в терминах futures 0.1). Когда мы разрешаем (.await) футуру, отправляется HTTP-запрос, текущая задача приостанавливается и становится в очередь, чтобы продолжить работу после получения ответа.

Если вы сейчас запустите cargo run и откроете http://127.0.0.1:3000/foo в браузере, вы увидите домашнюю страницу Rust, а в консоли следующий вывод:

Listening on http://127.0.0.1:3000
Got request at /foo
making request to http://www.rust-lang.org/en-US/
request finished-- returning response

Поздравляем! Вы только что проксировали HTTP запрос.

Под капотом: выполнение Future и задач

В этом разделе мы рассмотрим как планируются Future и асинхронные задачи. Если вам только интересно изучить как писать высокоуровневый код, который использует существующие типы Future, и не интересно, как работает Future, то можете сразу перейти к главе async/await. Тем не менее, некоторые темы, которые обсуждаются в этой главе, полезны для понимания работы async/await кода и построения новых асинхронных примитивов. Если сейчас вы решили пропустить этот раздел, вы можете добавить его в закладки, чтобы вернуться к нему в будущем.

Теперь давайте рассмотрим типаж Future.

Типаж Future

Типаж Future является центральным для асинхронного программирования в Rust. Future - это асинхронное вычисление, которое может производить значение (хотя значение может быть и пустым, например ()). Упрощённый вариант этого типажа может выглядеть как-то так:


#![allow(unused)]
fn main() {
trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
}

Футуры могут прогрессировать при помощи функции poll, которая продвигает их к завершению на сколько это возможно. Если футура завершается, она возвращает Poll::Ready(result). Если же она всё ещё не готова завершиться, то - Poll::Pending и обрабатывает функцию wake() таким образом, что она будет вызвана, когда Future будет готова прогрессировать. Когда wake() вызван, исполнитель снова вызывает у Future метод poll, чтобы она смогла продвинуться далее.

Без wake(), исполнитель не имеет возможности узнать, когда какая-либо футура может продвинуться, и ему необходимо постоянно опрашивать каждую футуру. С wake() он точно знает какие футуры готовы прогрессировать.

Например, представим ситуацию, когда мы хотим прочитать из сокета, который может иметь, а может и не иметь данных. Если данные есть, мы можем прочитать их и вернуть Poll::Ready(data), но если данных ещё нет, наша футура блокируется и не может продвинуться дальше. Если данных нет, то мы должны зарегистрировать вызов wake, чтобы он был вызван, когда данные появятся в сокете, и сообщил нашему исполнителю, что футура готова прогрессировать. Простая футура SocketRead может выглядеть следующим образом:


#![allow(unused)]
fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
}

Такая модель футур позволяет держать вместе несколько асинхронных операций без лишних промежуточных выделений памяти. Одновременный запуск нескольких футур или соединение их в цепочку может быть реализовано при помощи не выделяющей памяти машины состояний, например так:


#![allow(unused)]
fn main() {
/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
    // Each field may contain a future that should be run to completion.
    // If the future has already completed, the field is set to `None`.
    // This prevents us from polling a future after it has completed, which
    // would violate the contract of the `Future` trait.
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // Attempt to complete future `a`.
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // Attempt to complete future `b`.
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // Both futures have completed-- we can return successfully
            Poll::Ready(())
        } else {
            // One or both futures returned `Poll::Pending` and still have
            // work to do. They will call `wake()` when progress can be made.
            Poll::Pending
        }
    }
}
}

Здесь показано, как несколько футур могут быть запущены одновременно без необходимости раздельной аллокации, позволяя асинхронным программам быть более эффективными. Аналогично, несколько последовательных футур могут быть запущены одна за другой, как тут:


#![allow(unused)]
fn main() {
/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // We've completed the first future-- remove it and start on
                // the second!
                Poll::Ready(()) => self.first.take(),
                // We couldn't yet complete the first future.
                Poll::Pending => return Poll::Pending,
            };
        }
        // Now that the first future is done, attempt to complete the second.
        self.second.poll(wake)
    }
}
}

Этот пример показывает, как типаж Future может использоваться для выражения асинхронного управления потоком без необходимости множественной аллокации объектов и глубоко вложенных замыканий. Давайте оставим базовое управление потоком в стороне и поговорим о реальном типаже Future и чем он отличается от написанного нами.


#![allow(unused)]
fn main() {
trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}
}

Первое, что вы могли заметить, что наш тип self больше не &mut self, а заменён на Pin<&mut Self>. Мы поговорим о закреплении (pinning) в следующей секции, но пока что знайте, что оно позволяет нам создавать неперемещаемые футуры. Неперемещаемые объекты могут хранить указатели на собственные поля, например struct MyFut { a: i32, ptr_to_a: *const i32 }. Закрепление необходимо для async/await.

Второе, wake: fn() была изменена на &mut Context<'_>. В SimpleFuture мы использовали вызов указателя на функцию (fn()), чтобы сказать исполнителю, что футура должна быть опрошена. Однако, так как fn() имеет нулевой тип, она не может сохранить информацию о том какая футура вызвала wake.

В примере из реального мира, сложное приложение, такое как web-сервер, может иметь тысячи различных подключений все пробуждения которых должны обрабатываться отдельно. Тип Context решает это предоставляя доступ к значению типа Waker, который может быть использован для пробуждения конкретной задачи.

Вызовы задачи при помощи Waker

Обычно футуры не могут разрешиться сразу же, как для них вызвали метод poll. Когда это произойдёт, футура должна быть уверена, что её снова опросят, когда она будет готова прогрессировать. Это решается при помощи типа Waker.

Футура опрашивается как часть "задачи" каждый раз, когда происходит её опрос. Задачи - это высокоуровневые футуры, с которыми работает исполнитель.

Waker предоставляет метод wake(), который может быть использован, чтобы сказать исполнителю, что соответствующая задача должна быть пробуждена. Когда вызывается wake(), исполнитель знает, что задача, связанная с Waker, готова к выполнению, и её футура должна быть опрошена снова.

Waker так же реализует clone(), так что вы можете его копировать, где это необходимо, и хранить.

Давайте попробуем реализовать простой таймер с использованием Waker.

Применение: Создание таймера

В качестве примера, мы просто раскручиваем новый поток при создании таймера, спим в течение необходимого времени, а затем через какое-то время сообщаем о том, что заданный временной промежуток истёк.

Вот список импортов, которые нам понадобятся:


#![allow(unused)]
fn main() {
use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};
}

Начнём с определения типа футуры. Нашей футуре необходим канал связи, чтобы сообщить о том, что время таймера истекло и футура должна завершиться. В качестве канала связи между таймером и футурой мы будем использовать разделяемое значение Arc<Mutex<..>>.


#![allow(unused)]
fn main() {
pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
struct SharedState {
    /// Whether or not the sleep time has elapsed
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    waker: Option<Waker>,
}
}

Теперь давайте реализуем Future для нашей футуры!


#![allow(unused)]
fn main() {
impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
}

Просто, не так ли? Если поток установит shared_state.completed = true, мы закончили! В противном случае мы клонируем Waker для текущей задачи и сохраняем его в shared_state.waker. Так поток может разбудить задачу позже.

Важно отметить, что мы должны обновлять Waker каждый раз, когда футура опрашивается, потому что она может быть перемещена в другую задачу с другим Waker. Это может произойти когда футуры после опроса передаются между задачами.

Наконец, нам нужен API, чтобы фактически построить таймер и запустить поток:


#![allow(unused)]
fn main() {
impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}
}

Это всё, что нам нужно, чтобы построить простую футуру таймером. Теперь нам нужен исполнитель, чтобы запустить её на исполнение.

Применение: создание исполнителя

Футуры Rust'a ленивы: они ничего не будут делать, если не будут активно доводиться до завершения. Один из способов довести их до завершения - это .await внутри async функции, но это просто подталкивает проблему на один уровень вверх: кто будет запускать футуры, возвращённые из async функций верхнего уровня? Ответ в том, что нам нужен исполнитель для Future.

Исполнители берут набор future верхнего уровня и запускают их через вызов метода poll, до тех пока они не завершатся. Как правило, исполнитель будет вызывать метод poll у future один раз, чтобы запустить. Когда future сообщают, что готовы продолжить вычисления при вызове метода wake(), они помещаются обратно в очередь и вызов poll повторяется до тех пор, пока Future не будут завершены.

В этом разделе мы напишем нашего собственного простого исполнителя, способного одновременно запускать большое количество футур верхнего уровня.

Для этого примера мы используем пакет futures, в котором определён типаж ArcWake. Данный типаж предоставляет простой способ создания Waker.

[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures = "0.3"

Дальше, мы должны в верхней части src/main.rs импортировать следующее:

use {
    futures::{
        future::{BoxFuture, FutureExt},
        task::{waker_ref, ArcWake},
    },
    std::{
        future::Future,
        sync::mpsc::{sync_channel, Receiver, SyncSender},
        sync::{Arc, Mutex},
        task::{Context, Poll},
        time::Duration,
    },
    // The timer we wrote in the previous section:
    timer_future::TimerFuture,
};

Наш исполнитель будет работать, посылая задачи для запуска по каналу. Исполнитель извлечёт события из канала и запустит их. Когда задача будет готова выполнить больше работы (будет пробуждена), она может запланировать повторный опрос самой себя, отправив себя обратно в канал.

В этом проекте самому исполнителю просто необходим получатель для канала задачи. Пользователь получит экземпляр отправителя, чтобы он мог создавать новые футуры. Сами задачи - это просто футуры, которые могут перезапланировать самих себя, поэтому мы сохраним их как сочетание футуры и отправителя, который задача может использовать, чтобы добавить себя в очередь.

/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
    /// In-progress future that should be pushed to completion.
    ///
    /// The `Mutex` is not necessary for correctness, since we only have
    /// one thread executing tasks at once. However, Rust isn't smart
    /// enough to know that `future` is only mutated from one thread,
    /// so we need to use the `Mutex` to prove thread-safety. A production
    /// executor would not need this, and could use `UnsafeCell` instead.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// Handle to place the task itself back onto the task queue.
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once.
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}

Давайте ещё добавим метод в spawner, чтобы было проще создавать новые футуры. Этот метод возьмёт тип футуры, поместит его в коробку и создаст новую задачу Arc<Task> с ним внутри. И эта задача может быть помещена в очередь для исполнителя.

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

Чтобы опросить futures, нам нужно создать Waker. Как описано в разделе задачи пробуждения, Wakers отвечают за планирование задач, которые будут опрошены снова после вызова wake. Wakers сообщают исполнителю, какая именно задача завершилась, позволяя опрашивать как раз те futures, которые готовы к продолжению выполнения. Простой способ создать новый Waker, необходимо реализовать типаж ArcWake, а затем использовать waker_ref или .into_waker() функции для преобразования Arc<impl ArcWake> в Waker. Давайте реализуем ArcWake для наших задач, чтобы они были превращены в Wakers и могли пробуждаться:

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}

Когда Waker создаётся на основе Arc<Task>, вызывая wake(), это вызовет отправку копии Arc в канал задач. Тогда нашему исполнителю нужно подобрать задание и опросить его. Давайте реализуем это:

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // Create a `LocalWaker` from the task itself
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` is a type alias for
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                // We can get a `Pin<&mut dyn Future + Send + 'static>`
                // from it by calling the `Pin::as_mut` method.
                if let Poll::Pending = future.as_mut().poll(context) {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future.
                    *future_slot = Some(future);
                }
            }
        }
    }
}

Поздравляю! Теперь у нас есть работающий исполнитель futures. Мы даже можем использовать его для запуска async/.await кода и пользовательских futures, таких как TimerFuture которую мы описали ранее:

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // Spawn a task to print before and after waiting on a timer.
    spawner.spawn(async {
        println!("howdy!");
        // Wait for our timer future to complete after two seconds.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // Drop the spawner so that our executor knows it is finished and won't
    // receive more incoming tasks to run.
    drop(spawner);

    // Run the executor until the task queue is empty.
    // This will print "howdy!", pause, and then print "done!".
    executor.run();
}

Исполнители и системный ввод/вывод

В главе "Типаж Future", мы обсуждали футуру, которая выполняет асинхронное чтение сокета:


#![allow(unused)]
fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
}

Эта футура читает из сокета доступные данные и если таковых нет, то она передаётся исполнителю с запросом активирования задачи, когда сокет снова станет читаемым. Однако, из текущего примера не ясна реализация типа Socket и, в частности, не совсем очевидно как работает функция set_readable_callback. Как мы можем сделать так, чтобы lw.wake() был вызван, когда сокет станет читаемым? Один из вариантов - иметь поток, который постоянно проверяет стал ли socket читаемым, вызывая при необходимости метод wake(). Тем не менее, такой подход будет весьма не эффективным, так как он требует отдельного потока для каждой блокирующей IO футуры. Это значительно снизит эффективность нашего асинхронного кода.

На практике эта проблема решается при помощи интеграции с IO-зависимыми системными блокирующими примитивами такими, как epoll в Linux, kqueue во FreeBSD и Mac OS, IOCP в Windows и port в Fuchsia (все они предоставляются при помощи кроссплатформенного Rust-пакета mio). Все эти примитивы позволяют потоку заблокироваться с несколькими асинхронными IO-событиями, возвращая одно из завершённых событий. На практике эти API выглядят примерно так:


#![allow(unused)]
fn main() {
struct IoBlocker {
    ...
}

struct Event {
    // ID уникально идентифицирующий событие, которое уже произошло и на которое мы подписались
    id: usize,

    // Набор сигналов, которые мы ожидаем или которые произошли
    signals: Signals,
}

impl IoBlocker {
    /// Создаём новую коллекцию асинхронных IO-событий для блокировки
    fn new() -> Self { ... }

    /// Подпишемся на определённое IO-событие.
    fn add_io_event_interest(
        &self,

        /// Объект, на котором происходит событие
        io_object: &IoObject,

        /// Набор сигналов, которые могут применяться к `io_object`,
        /// для которого должно быть инициировано событие, в паре с
        /// ID, которые передадутся событиям, получившимся в результате нашей подписки.
        event: Event,
    ) { ... }

    /// Заблокируется до появления одного из событий
    fn block(&self) -> Event { ... }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// выведет что-то похожее на "Socket 1 is now READABLE", если сокет станет доступным для чтения.
println!("Socket {:?} is now {:?}", event.id, event.signals);
}

Исполнители футур могут использовать эти примитивы для предоставления асинхронных объектов ввода-вывода, таких как сокеты, которые могут настроить обратные вызовы для запуска при определённом IO-событии. В случае нашего примера c SocketRead, функция Socket::set_readable_callback может выглядеть следующим псевдокодом:


#![allow(unused)]
fn main() {
impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` является ссылкой на локальный исполнитель.
        // Это может быть предусмотрено при создании сокета, 
        // большинство реализаций исполнителей делают это через локальный поток, так удобнее.
        let local_executor = self.local_executor;

        // Уникальный ID для объекта ввода вывода.
        let id = self.id;

        // Сохраним `waker` в данных исполнителя,
        // чтобы его можно было вызвать после того, как будет получено событие.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}
}

Теперь у нас может быть только один поток исполнителя, который может принимать и отправлять любые события ввода-вывода в нужный Waker, который разбудит соответствующую задачу, позволяющая исполнителю довести больше задач до завершения перед возвратом к проверке новых событий ввода-вывода (и цикл продолжается...).

async/await

В первой главе мы бросили беглый взгляд на async/.await и использовали их чтобы построить простой сервер. В этой главе мы обсудим async/.await более подробно, объясняя, как они работают и как async-код отличается от традиционных программ на Rust.

async/.await - это специальный синтаксис Rust, который позволяет не блокировать поток, а передавать управление другому коду, пока ожидается завершение операции.

Существует два основных способа использования async: async fn и async-блоки. Каждый возвращает значение, реализующее типаж Future:


#![allow(unused)]

fn main() {
// `foo()` returns a type that implements `Future<Output = u8>`.
// `foo().await` will result in a value of type `u8`.
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // This `async` block results in a type that implements
    // `Future<Output = u8>`.
    async {
        let x: u8 = foo().await;
        x + 5
    }
}
}

Как мы видели в первой главе, async-блоки и другие футуры ленивы: они ничего не делают, пока их не запустят. Наиболее распространённый способ запустить Future - это .await. Когда .await вызывается на Future, он пытается выполнить её до конца. Если Future заблокирована, то контроль будет передан текущему потоку. Чтобы добиться большего прогресса, будет выбрана верхняя Future исполнителя, позволяя .await продолжить работу.

Времена жизни async

В отличие от традиционных функций, async fn, которые принимают ссылки или другие не-'static аргументы, возвращают Future, которая ограничена временем жизни аргумента:


#![allow(unused)]
fn main() {
// This function:
async fn foo(x: &u8) -> u8 { *x }

// Is equivalent to this function:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}
}

Это означает, что для футуры, возвращаемая из async fn, должен быть вызван .await до тех пор, пока её не-'static аргументы все ещё действительны. В общем случае, вызов .await у футуры сразу после вызова функции (как в foo(&x).await), не является проблемой. Однако, проблемой может оказаться сохранение футуры или отправка её в другую задачу или поток.

Один общий обходной путь для включения async fn со ссылками в аргументах в 'static футуру состоит в том, чтобы связать аргументы с вызовом async fn внутри async-блока:


#![allow(unused)]
fn main() {
fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR: `x` does not live long enough
}

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}
}

Перемещая аргумент в async-блок, мы продлеваем его время жизни до времени жизни Future, которая возвращается при вызове good.

async move

async-блоки и async-замыкания позволяют использовать ключевое слово move, как и в обычном замыкании. async move блок получает владение переменными со ссылками, позволяя им пережить текущую область, но отказывая им в возможности делиться этими переменными с другим кодом:


#![allow(unused)]
fn main() {
/// `async` block:
///
/// Multiple different `async` blocks can access the same local variable
/// so long as they're executed within the variable's scope
async fn blocks() {
    let my_string = "foo".to_string();

    let future_one = async {
        // ...
        println!("{}", my_string);
    };

    let future_two = async {
        // ...
        println!("{}", my_string);
    };

    // Run both futures to completion, printing "foo" twice:
    let ((), ()) = futures::join!(future_one, future_two);
}

/// `async move` block:
///
/// Only one `async move` block can access the same captured variable, since
/// captures are moved into the `Future` generated by the `async move` block.
/// However, this allows the `Future` to outlive the original scope of the
/// variable:
fn move_block() -> impl Future<Output = ()> {
    let my_string = "foo".to_string();
    async move {
        // ...
        println!("{}", my_string);
    }
}
}

.await в многопоточном исполнителе

Обратите внимание, что при использовании Future в многопоточном исполнителе, Future может перемещаться между потоками. Поэтому любые переменные, используемые в телах async, должны иметь возможность перемещаться между потоками, так как любой .await потенциально может привести к переключению на новый поток.

Это означает, что не безопасно использовать Rc, &RefCell или любые другие типы, не реализующие типаж Send (включая ссылки на типы, которые не реализуют типаж Sync).

(Предостережение: можно использовать эти типы до тех пор, пока они не находятся в области действия вызова .await.)

Точно так же не очень хорошая идея держать традиционную non-futures-aware блокировку через .await, так как это может привести к блокировке пула потоков: одна задача может получить объект блокировки, вызвать .await и передать управление исполнителю, разрешив другой задаче совершить попытку взять блокировку, что вызовет взаимную блокировку. Чтобы избежать этого, используйте Mutex из futures::lock, а не из std::sync.

Закрепление (pinning)

Чтобы опросить футуры, они должны быть закреплены с помощью специального типа Pin<T>. Если вы прочитали объяснение Future в предыдущем разделе «Выполнение Future и задач», вы узнаете Pin из self: Pin<&mut Self> в определении метода Future::poll. Но что это значит, и зачем нам это нужно?

Для чего нужно закрепление

Закрепление даёт возможность гарантировать, что объект не будет перемещён. Чтобы понять почему это важно, нам надо помнить как работает async/.await. Рассмотрим следующий код:

let fut_one = /* ... */;
let fut_two = /* ... */;
async move {
    fut_one.await;
    fut_two.await;
}

Под капотом, он создаёт анонимный тип, который реализует типаж Future, предоставляющий метод poll, выглядящий примерно так:

// The `Future` type generated by our `async { ... }` block
struct AsyncFuture {
    fut_one: FutOne,
    fut_two: FutTwo,
    state: State,
}

// List of states our `async` block can be in
enum State {
    AwaitingFutOne,
    AwaitingFutTwo,
    Done,
}

impl Future for AsyncFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        loop {
            match self.state {
                State::AwaitingFutOne => match self.fut_one.poll(..) {
                    Poll::Ready(()) => self.state = State::AwaitingFutTwo,
                    Poll::Pending => return Poll::Pending,
                }
                State::AwaitingFutTwo => match self.fut_two.poll(..) {
                    Poll::Ready(()) => self.state = State::Done,
                    Poll::Pending => return Poll::Pending,
                }
                State::Done => return Poll::Ready(()),
            }
        }
    }
}

Когда poll вызывается первый раз, он опрашивает fut_one. Если fut_one не завершена, возвращается AsyncFuture::poll. Следующие вызовы poll будут начинаться там, где завершился предыдущий вызов. Этот процесс продолжается до тех пор, пока future не сможет завершиться.

Однако, что будет, если async блок использует ссылки? Например:

async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);
}

Во что скомпилируется эта структура?

struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // указывает на `x` далее
}

struct AsyncFuture {
     x: [u8; 128],
     read_into_buf_fut: ReadIntoBuf<'?>, // какое тут время жизни?
}

Здесь future ReadIntoBuf содержит ссылку на другое поле нашей структуры, x. Однако, если AsyncFuture будет перемещена, положение x тоже будет изменено, что инвалидирует указатель, сохранённый в read_into_buf_fut.buf.

Закрепление футур в определённом месте памяти предотвращает эту проблему, делая безопасным создание ссылок на данные за пределами async блока.

Как использовать закрепление

Тип Pin оборачивает указатель на другие типы, гарантируя, что значение за указателем не будет перемещено. Например, Pin<&mut T&gt;, Pin<&T&gt;, Pin<Box<T>&gt; - все гарантируют, что положение T останется неизменным.

У большинства типов нет проблем с перемещением. Эти типы реализуют типаж Unpin. Указатели на Unpin-типы могут свободно помещаться в Pin или извлекаться из него. Например, тип u8 реализует Unpin, таким образом Pin<&mut u8> ведёт себя также, как и &mut u8.

Некоторые функции требуют, чтобы футуры, с которыми они работают, были Unpin. Чтобы использовать Future или Stream, который не реализует Unpin, с функцией, которая требует Unpin-типы, сначала нужно закрепить значение, используя либо Box::pin (чтобы создать Pin<Box<T>>) или макрос pin_utils::pin_mut! (чтобы создать Pin<&mut T>). Pin<Box<Fut>> и Pin<&mut Fut> могут быть использованы как футура и оба реализуют Unpin.

Например:

use pin_utils::pin_mut; // `pin_utils` is a handy crate available on crates.io

// A function which takes a `Future` that implements `Unpin`.
fn execute_unpin_future(x: impl Future<Output = ()> + Unpin) { /* ... */ }

let fut = async { /* ... */ };
execute_unpin_future(fut); // Error: `fut` does not implement `Unpin` trait

// Pinning with `Box`:
let fut = async { /* ... */ };
let fut = Box::pin(fut);
execute_unpin_future(fut); // OK

// Pinning with `pin_mut!`:
let fut = async { /* ... */ };
pin_mut!(fut);
execute_unpin_future(fut); // OK

Типаж Stream

Типаж Stream похож на Future, но до своего завершения может давать несколько значений. Также он похож на типаж Iterator из стандартной библиотеки:


#![allow(unused)]
fn main() {
trait Stream {
    /// The type of the value yielded by the stream.
    type Item;

    /// Attempt to resolve the next item in the stream.
    /// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
    /// is ready, and `Poll::Ready(None)` if the stream has completed.
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}
}

Одним из распространённых примеров Stream является Receiver для типа канала из пакета futures. Он даёт Some(val) каждый раз, когда значение отправляется от Sender, и даст None после того, как Sender был удалён из памяти и все ожидающие сообщения были получены:


#![allow(unused)]
fn main() {
async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` is similar to `Iterator::next`, but returns a
    // type that implements `Future<Output = Option<T>>`.
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}
}

Итерирование и параллелизм

Подобно синхронным итераторам, существует множество различных способов итерации и обработки значений в Stream. Существуют методы комбинаторного стиля например, map, filter и fold и их братьев раннего-выхода-из-за-ошибки try_map, try_filter и try_fold.

К сожалению, цикл for не может использоваться для Stream, но для императивного стиля написания кода, могут быть использованы while let и функции next/try_next:


#![allow(unused)]
fn main() {
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}
}

Однако, если мы просто обрабатываем один элемент за раз, мы потенциально оставляем возможность для параллелизма, который, в конце концов, стоит на первом месте при написании асинхронного кода. Для обработки нескольких элементов из потока одновременно, используйте методы for_each_concurrent и try_for_each_concurrent:


#![allow(unused)]
fn main() {
async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}
}

Одновременное выполнение нескольких Future

До этого времени, мы в основном выполняли футуры используя .await, который блокирует текущую задачу до тех пор, пока не завершится отдельная Future. Однако, настоящие асинхронные приложения чаще всего должны выполнять несколько различных операций одновременно.

Одновременное выполнение нескольких Future

В этой главе мы рассмотрим разные способы одновременного выполнения нескольких асинхронных операций:

  • join!: ждёт завершения всех футур
  • select!: ждёт завершения одной из футур
  • Порождение: создание задач верхнего уровня, которые запускают футуры до их завершения
  • FuturesUnordered: группа футур, которые возвращают результат каждой подфутуры

join!

Макрос futures::join позволяет дождаться завершения нескольких разных футур при одновременном их выполнении.

join!

При выполнении нескольких асинхронных операций возникает соблазн просто последовательно вызвать несколько .await:


#![allow(unused)]
fn main() {
async fn get_book_and_music() -> (Book, Music) {
    let book = get_book().await;
    let music = get_music().await;
    (book, music)
}
}

Однако это будет медленнее, чем необходимо, так как он не начнёт пытаться выполнять get_music до завершения get_book. В некоторых других языках, футуры выполняются до завершения, поэтому две операции могут быть запущены одновременно сначала вызовом каждой async fn, для запуска футур, а потом их ожиданием:


#![allow(unused)]
fn main() {
// WRONG -- don't do this
async fn get_book_and_music() -> (Book, Music) {
    let book_future = get_book();
    let music_future = get_music();
    (book_future.await, music_future.await)
}
}

Однако футуры на Rust не будут работать, пока для них не будет вызван .await. Это означает, что оба приведённых выше фрагмента кода запустят book_future и music_future последовательно, вместо того, чтобы запустить их параллельно. Чтобы правильно распараллелить их выполнение, используйте futures::join!:


#![allow(unused)]
fn main() {
use futures::join;

async fn get_book_and_music() -> (Book, Music) {
    let book_fut = get_book();
    let music_fut = get_music();
    join!(book_fut, music_fut)
}
}

Значение, возвращаемое join! - это кортеж, содержащий выходные данные каждой из переданных Future.

try_join!

Для футур, которые возвращают Result, может использоваться try_join!, а не join!. Так как join! завершается только после завершения всех подфутур, он будет продолжать обрабатывать другие футуры даже после того, как одна из подфутур вернёт Err.

В отличие отjoin!, try_join! завершится немедленно, если какая-либо из подфутур вернёт ошибку.


#![allow(unused)]
fn main() {
use futures::try_join;

async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book();
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}
}

Обратите внимание, что все футуры, переданные в try_join!, должны иметь один и тот же тип ошибки. Рассмотрите возможность использования функций .map_err(|e| ...) и .err_into() из futures::future::TryFutureExt для приведения типов ошибок к единому виду:


#![allow(unused)]
fn main() {
use futures::{
    future::TryFutureExt,
    try_join,
};

async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}
}

select!

Макрос futures::select запускает несколько футур одновременно и передаёт управление пользователю, как только любая из футур завершится.


#![allow(unused)]
fn main() {
use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
}

Функция выше запустит обе t1 и t2 параллельно. Когда t1 или t2 закончится, соответствующий дескриптор вызовет println! и функция завершится без выполнения оставшейся задачи.

Базовый синтаксис для select: <pattern> = <expression> => <code>,, повторяемый столько раз, из скольких футур вам надо сделать select.

default => ... и complete => ...

Также select поддерживает ветки default и complete.

Ветка default выполнится, если ни одна из футур, переданных в select, не завершится. Поэтому select с веткой default всегда будет незамедлительно завершаться, так как default будет запущен, когда ещё ни одна футура не готова.

Ветка complete может быть использована для обработки случая, когда все футуры, бывшие в select, завершились и уже не могут прогрессировать. Это бывает удобно, когда select! используется в цикле.


#![allow(unused)]
fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}
}

Взаимодействие с Unpin и FusedFuture

Одна вещь, на которую вы могли обратить внимание в первом примере, это то, что мы вызвали .fuse() для футур, возвращённых из двух async fn, а потом закрепили их с помощью pin_mut. Оба этих вызова важны, потому что футуры, используемые в select, должны реализовывать и типаж Unpin, и типаж FusedFuture.

Unpin важен по той причине, что футуры, используемые в select, берутся не по значению, а по изменяемой ссылке. Так как владение футурами никому не передано, незавершённые футуры могут быть снова использованы после вызова select.

Аналогично, типаж FusedFuture необходим, так как select не должен опрашивать футуры после их завершения. FusedFuture реализуется футурами, которые отслеживают, завершены ли они или нет. Это делает возможным использование select в цикле, опрашивая только те футуры, которые до сих пор не завершились. Это можно увидеть в примере выше, где a_fut или b_fut будут завершены во второй раз за цикл. Так как футура, возвращённая future::ready, реализует FusedFuture, она может сообщить select, что её не надо снова опросить.

Заметьте, что у стримов есть соответствующий типаж FusedStream. Стримы, реализующие этот типаж или имеющие обёртку, созданную .fuse(), возвращают FusedFuture из комбинаторов .next() и .try_next().


#![allow(unused)]
fn main() {
use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}
}

Распараллеливание задач в цикле с select с помощью Fuse и FuturesUnordered

Одна довольно труднодоступная, но удобная функция - Fuse::terminated(), которая позволяет создавать уже прекращённые пустые футуры, которые в последствии могут быть заполнены другой футурой, которую надо запустить.

Это может быть удобно, когда есть задача, которую надо запустить в цикле в select, но которая создана вне этого цикла.

Обратите внимание на функцию .select_next_some(). Она может использоваться с select для запуска полученных из стрима тех ветвей, которые имеют значение Some(_), а не None.


#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
}

Когда надо одновременно запустить много копий какой-либо футуры, используйте тип FuturesUnordered. Следующий пример похож на один из тех, что был выше, но здесь мы дожидаемся завершения каждой выполненной копии run_on_new_num_fut, а не останавливаем её при создании новой. Он также отобразит значение, возвращённое run_on_new_num_fut.


#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

}

Обходные пути, которые мы понимаем и любим

Поддержка async в Rust всё ещё довольно нова и некоторые востребованные функции активно разрабатываются, а некоторые диагностики до сих пор не полноценны. В этой главе обсуждаются некоторые болевые точки и объясняется как с ними работать.

Ошибки вывода для возвращаемых типов

В типичной функции на Rust, возврат значения неправильного типа приведёт к тому, что мы увидим примерно такую ошибку:

error[E0308]: mismatched types
 --> src/main.rs:2:12
  |
1 | fn foo() {
  |           - expected `()` because of default return type
2 |     return "foo"
  |            ^^^^^ expected (), found reference
  |
  = note: expected type `()`
             found type `&'static str`

Однако текущая версия async fn не знает как "доверять" возвращаемому типу, записанному в сигнатуре функции, что приводит к не совпадающим или reversed-sounding ошибкам. Например, для функции async fn foo() { "foo" } будет следующая ошибка:

error[E0271]: type mismatch resolving `<impl std::future::Future as std::future::Future>::Output == ()`
 --> src/lib.rs:1:16
  |
1 | async fn foo() {
  |                ^ expected &str, found ()
  |
  = note: expected type `&str`
             found type `()`
  = note: the return type of a function must have a statically known size

Ошибка говорит, что ожидает &str, а находит (), что совершенно противоположно тому, что мы хотим. Это потому, что компилятор некорректно разрешает телу функции вернуть корректный тип.

Временным решением для этой проблемы является признание факта, что ошибка, указывающая на сигнатуру функции с сообщением "expected SomeType, found OtherType", обычно показывает, что один или несколько возвращаемых вариантов не корректны.

Исправление этой ошибки отслеживается здесь.

Box<dyn Trait>

Аналогично, так как возвращаемый тип из сигнатуры функции не распространяется должным образом, значение, которое возвращаетasync fn не правильно приводится к ожидаемому типу.

На практике, это означает, что возвращаемый из async fn объект Box<dyn Trait> требует ручного преобразования при помощи as из Box<MyType> в Box<dyn Trait>.

Этот код приведёт к ошибке:

async fn x() -> Box<dyn std::fmt::Display> {
    Box::new("foo")
}

Временным решением для этого будет ручное преобразование с использованием as:

async fn x() -> Box<dyn std::fmt::Display> {
    Box::new("foo") as Box<dyn std::fmt::Display>
}

Исправление этой ошибки отслеживается здесь.

? в async блоках

Как и в async fn, ? также может использоваться внутри async блоков. Однако возвращаемый тип async блоков явно не указывается. Это может привести тому, что компилятор не сможет определить тип ошибки async блока.

Например, этот код:


#![allow(unused)]
fn main() {
let fut = async {
    foo().await?;
    bar().await?;
    Ok(())
};
}

вызовет ошибку:

error[E0282]: type annotations needed
 --> src/main.rs:5:9
  |
4 |     let fut = async {
  |         --- consider giving `fut` a type
5 |         foo().await?;
  |         ^^^^^^^^^^^^ cannot infer type

К сожалению, сейчас не способа "задать тип для fut" кроме как явно указать возвращаемый тип async блока. Для обработки этого, используйте "turbofish" оператор для предоставления типов ошибки и успеха async блока:


#![allow(unused)]
fn main() {
let fut = async {
    foo().await?;
    bar().await?;
    Ok::<(), MyError>(()) // <- обратите внимание на явное указание типа
};
}

Send Approximation

Некоторые машины состояний асинхронных функций безопасны для передачи между потокам, в то время как другие - нет. Так или иначе, async fn Future является Send если тип, содержащийся в .await, тоже Send. Компилятор делает всё возможное, чтобы при близиться к моменту, когда значения могут удерживаться в .await, но сейчас в некоторых местах этот анализ слишком консервативен.

Например, рассмотрим простой не-Send тип, например, содержащий Rc:


#![allow(unused)]
fn main() {
use std::rc::Rc;

#[derive(Default)]
struct NotSend(Rc<()>);
}

Переменные типа NotSend могут появляться как временные внутри async fn даже когда тип Future, возвращаемой из async fn должен быть Send:

async fn bar() {}
async fn foo() {
    NotSend::default();
    bar().await;
}

fn require_send(_: impl Send) {}

fn main() {
    require_send(foo());
}

Но если мы изменим foo таким образом, что она будет хранить NotSend в переменной, пример не скомпилируется:


#![allow(unused)]
fn main() {
async fn foo() {
    let x = NotSend::default();
    bar().await;
}
}
error[E0277]: `std::rc::Rc<()>` cannot be sent between threads safely
  --> src/main.rs:15:5
   |
15 |     require_send(foo());
   |     ^^^^^^^^^^^^ `std::rc::Rc<()>` cannot be sent between threads safely
   |
   = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<()>`
   = note: required because it appears within the type `NotSend`
   = note: required because it appears within the type `{NotSend, impl std::future::Future, ()}`
   = note: required because it appears within the type `[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]`
   = note: required because it appears within the type `std::future::GenFuture<[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]>`
   = note: required because it appears within the type `impl std::future::Future`
   = note: required because it appears within the type `impl std::future::Future`
note: required by `require_send`
  --> src/main.rs:12:1
   |
12 | fn require_send(_: impl Send) {}
   | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0277`.

Эта ошибка корректна. Если мы сохраним x в переменную, она не будет удалена пока не будет завершён .await. В этот момент async fn может быть запущена в другом потоке. Так как Rc не является Send, перемещение между потоками будет некорректным. Простым решением будет вызов drop у Rc до вызова .await, но к сожалению пока что это не работает.

Для того, чтобы успешно обойти эту проблему, вы можете создать блок, инкапсулирующий любые не-Send переменные. С помощью этого, компилятору будет проще понять, что такие переменные не переживут момент вызова .await.


#![allow(unused)]
fn main() {
async fn foo() {
    {
        let x = NotSend::default();
    }
    bar().await;
}
}

Рекурсия

Под капотом, async fn создаёт тип конечного автомата, содержащий каждую подфутуру для который был вызван .await. Из-за этого рекурсивные async fn становятся более сложными, так как итоговый конечный автомат содержит сам себя:


#![allow(unused)]
fn main() {
// Эта функция:
async fn foo() {
    step_one().await;
    step_two().await;
}
// создаёт типы, подобные следующим:
enum Foo {
    First(StepOne),
    Second(StepTwo),
}

// А эта функция:
async fn recursive() {
    recursive().await;
    recursive().await;
}

// создаёт такие типы:
enum Recursive {
    First(Recursive),
    Second(Recursive),
}
}

Это не будет работать - мы создали тип бесконечного размера! Компилятор будет жаловаться:

error[E0733]: recursion in an `async fn` requires boxing
 --> src/lib.rs:1:22
  |
1 | async fn recursive() {
  |                      ^ an `async fn` cannot invoke itself directly
  |
  = note: a recursive `async fn` must be rewritten to return a boxed future.

Чтобы исправить это, мы должны ввести косвенность при помощи Box. К сожалению, из-за ограничений компилятора, обернуть вызов recursive() в Box::pin не достаточно. Чтобы это заработало, мы должны сделать recursive не асинхронной функцией, которая возвращает .boxed() с async блоком:


#![allow(unused)]
fn main() {
use futures::future::{BoxFuture, FutureExt};

fn recursive() -> BoxFuture<'static, ()> {
    async move {
        recursive().await;
        recursive().await;
    }.boxed()
}
}

async в типажах

В настоящий момент async fn не могут использоваться в типажах. Причиной является большая сложность, но снятие этого ограничения находится в планах на будущее.

Однако вы можете обойти это ограничение при помощи пакета async_trait с crates.io.

Заметьте, что использование этих методов типажей приведёт к выделениям памяти в куче для каждого вызова функции. Это не значительная стоимость для большинства приложений, но следует это учитывать при принятии решения использовать данную функциональность в публичном API низкоуровневых функций, которые будут вызываться миллион раз в секунду.