Начало работы

translated

Добро пожаловать в книгу "Асинхронное программирование в Rust"! Если вы собираетесь начать писать асинхронный код на Rust, вы находитесь в правильном месте. Строите ли вы веб-сервер, базу данных или операционную систему, эта книга покажет вам как использовать инструменты асинхронного программирования, чтобы получить максимальную отдачу от вашего оборудования.

Что охватывает эта книга?

Эта книга призвана стать исчерпывающим, современным руководством по использованию асинхронных языковых возможностей и библиотек Rust, подходящим как новичкам, так и опытным разработчикам.

  • Ранние главы содержат введение в асинхронное программирование в целом, а также его особенности в Rust.

  • В средних главах обсуждаются ключевые утилиты и инструменты управления потоком, которые вы можете использовать, когда пишете асинхронный код. Также здесь описаны лучшие практики структурирования библиотек и приложений для получения максимальной производительности и повторного использования кода.

  • Последняя глава книги покрывает асинхронную экосистему и предоставляет ряд примеров того, как можно выполнить общие задачи.

Итак, давайте исследуем захватывающий мир асинхронного программирования в Rust!

Для чего нужна асинхронность?

Все мы любим то, что Rust позволяет нам писать быстрые и безопасные приложения. Но для чего писать асинхронный код?

Асинхронный код позволяет нам запускать несколько задач параллельно в одном потоке ОС. Если вы в обычном приложении хотите одновременно загрузить две разных web-страницы, вы должны разделить работу между двумя разным потоками, как тут:

fn get_two_sites() {
    // Spawn two threads to do work.
    let thread_one = thread::spawn(|| download("https://www.foo.com"));
    let thread_two = thread::spawn(|| download("https://www.bar.com"));

    // Wait for both threads to complete.
    thread_one.join().expect("thread one panicked");
    thread_two.join().expect("thread two panicked");
}

Для многих приложений это замечательно работает - в конце концов, потоки были разработаны именно для этого: одновременно запускать несколько разных задач. Однако, они имеют некоторые ограничения. В процессе переключения между разными потоками и обменом данными между ними возникает много накладных расходов. Даже поток, который сидит и ничего не делает, использует ценные системные ресурсы. Асинхронный код предназначен для устранения этих проблем. Мы можем переписать функции выше используя нотацию async/.await, которая позволяет нам запустить несколько задач одновременно, не создавая нескольких потоков:

async fn get_two_sites_async() {
    // Create two different "futures" which, when run to completion,
    // will asynchronously download the webpages.
    let future_one = download_async("https://www.foo.com");
    let future_two = download_async("https://www.bar.com");

    // Run both futures to completion at the same time.
    join!(future_one, future_two);
}

В целом, асинхронные приложения могут быть намного быстрее и использовать меньше ресурсов, чем соответствующая многопоточная реализация. Однако, есть и обратная сторона. Потоки изначально поддерживаются операционной системой и их использование не требует какой-либо специальной модели программирования - любая функция может создать поток и вызов функции, использующей потоки, обычно так же прост, как вызов обычной функции. Тем не менее, асинхронные функции требуют специальной поддержки со стороны языка или библиотек. В Rust, async fn создаёт асинхронную функцию, которая возвращает Future. Для выполнения тела функции, возвращённая Future должна быть завершена.

Важно помнить, что традиционные приложения с потоками могут быть вполне эффективными и предсказуемость Rust и небольшой объём используемой памяти могут значить, что вы можете далеко продвинуться и без использования async. Повышенная сложность асинхронной модели программирования не всегда стоит этого и важно понимать, когда ваше приложение будет лучше работать с использованием простой поточной модели.

Состояние асинхронности в Rust

Асинхронная экосистема Rust претерпела большие изменения с течением времени, поэтому может быть трудно понять, какие инструменты использовать, в какие библиотеки инвестировать, или какую документацию читать. Однако типаж Future внутри стандартной библиотеки и async/await в языке были недавно стабилизированы. Таким образом, экосистема в целом находится в процессе миграции к недавно стабилизированному API, после чего точка оттока будет значительно уменьшена.

Тем не менее, сейчас экосистема всё ещё находится в стадии активной разработки и асинхронный опыт в Rust не отполирован. Многие библиотеки до сих пор используют пакет futures версии 0.1, а это значит, что для взаимодействия с ними разработчикам часто требуется функциональность compat из пакета futures версии 0.3. Синтаксис async/await до сих пор достаточно нов. Важные расширения синтаксиса, такие как async fn для методов типажей, до сих пор не реализованы, и текущие сообщения компилятора об ошибках могут быть сложны для восприятия.

Это говорит о том, что Rust на пути к более эффективной и эргономичной поддержке асинхронного программирования и если вы не боитесь изменений, наслаждайтесь погружением в мир асинхронного программирования в Rust!

Пример async/.await

async/.await - инструменты для написания асинхронного кода встроенные в Rust, внешне похожие на синхронный код. Ключевое слово async превращает исполняемый блок программы в конечный автомат, который реализует типаж Future. В синхронном методе вызов функции блокирует весь поток, в асинхронном же вызов через Future вернёт контроль над потоком, позволяя работать другим Future.

Добавим некоторые зависимости в файл Cargo.toml:

[dependencies]
futures = "0.3"

При реализации асинхронной функции можно использовать такой синтаксис async fn:


#![allow(unused)]
fn main() {
async fn do_something() { /* ... */ }
}

async fn возвращает значение, которые является Future. Что бы ни произошло, Future должна быть запущена в исполнителе.

// `block_on` blocks the current thread until the provided future has run to
// completion. Other executors provide more complex behavior, like scheduling
// multiple futures onto the same thread.
use futures::executor::block_on;

async fn hello_world() {
    println!("hello, world!");
}

fn main() {
    let future = hello_world(); // Nothing is printed
    block_on(future); // `future` is run and "hello, world!" is printed
}

Внутри async fn можно использовать .await для ожидания завершения другой реализации типажа Future, например, полученной из другой async fn). В отличие от block_on, .await не блокирует текущий поток, а асинхронно ожидает завершения футуры, позволяя другим задачам в потоке выполняться, пока эта футура не может быть исполнена.

Например, представим что у нас есть три асинхронные функции async fn: learn_song, sing_song и dance:

async fn learn_song() -> Song { ... }
async fn sing_song(song: Song) { ... }
async fn dance() { ... }

Одним из способов выполнения функций «разучить песню», «спеть» и «станцевать» будет блокировка потока исполнения на каждой из них индивидуально:

fn main() {
    let song = block_on(learn_song());
    block_on(sing_song(song));
    block_on(dance());
}

В этом случае мы не достигаем наилучшей производительности - в один момент времени мы делаем только одно дело! Конечно, мы должны выучить песню до того, как петь её, но мы можем танцевать одновременно с разучиванием песни и пением. Чтобы провернуть такой вариант, создадим две отдельные функции с async fn, которые могут запуститься параллельно:

async fn learn_and_sing() {
    // Wait until the song has been learned before singing it.
    // We use `.await` here rather than `block_on` to prevent blocking the
    // thread, which makes it possible to `dance` at the same time.
    let song = learn_song().await;
    sing_song(song).await;
}

async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();

    // `join!` is like `.await` but can wait for multiple futures concurrently.
    // If we're temporarily blocked in the `learn_and_sing` future, the `dance`
    // future will take over the current thread. If `dance` becomes blocked,
    // `learn_and_sing` can take back over. If both futures are blocked, then
    // `async_main` is blocked and will yield to the executor.
    futures::join!(f1, f2);
}

fn main() {
    block_on(async_main());
}

В этом примере, разучивание песни должно быть завершено до пения, но разучивание и пение могут завершиться одновременно с танцем. Если бы мы использовали block_on(learn_song()) вместо learn_song().await внутри learn_and_sing, поток не смог бы делать ничего другого, пока работает learn_song. Из-за этого мы одновременно с этим не можем танцевать. С помощью ожидания .await футурыlearn_song, мы разрешаем другим задачам захватить текущий поток исполнения, пока learn_song заблокирована. Это даёт возможность запускать нескольких футур в одном потоке параллельно.

Под капотом: выполнение Future и задач

В этом разделе мы рассмотрим как планируются Future и асинхронные задачи. Если вам только интересно изучить как писать высокоуровневый код, который использует существующие типы Future, и не интересно, как работает Future, то можете сразу перейти к главе async/await. Тем не менее, некоторые темы, которые обсуждаются в этой главе, полезны для понимания работы async/await кода и построения новых асинхронных примитивов. Если сейчас вы решили пропустить этот раздел, вы можете добавить его в закладки, чтобы вернуться к нему в будущем.

Теперь давайте рассмотрим типаж Future.

Типаж Future

Типаж Future является центральным для асинхронного программирования в Rust. Future - это асинхронное вычисление, которое может производить значение (хотя значение может быть и пустым, например ()). Упрощённый вариант этого типажа может выглядеть как-то так:


#![allow(unused)]
fn main() {
trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
}

Футуры могут прогрессировать при помощи функции poll, которая продвигает их к завершению на сколько это возможно. Если футура завершается, она возвращает Poll::Ready(result). Если же она всё ещё не готова завершиться, то - Poll::Pending и обрабатывает функцию wake() таким образом, что она будет вызвана, когда Future будет готова прогрессировать. Когда wake() вызван, исполнитель снова вызывает у Future метод poll, чтобы она смогла продвинуться далее.

Без wake(), исполнитель не имеет возможности узнать, когда какая-либо футура может продвинуться, и ему необходимо постоянно опрашивать каждую футуру. С wake() он точно знает какие футуры готовы прогрессировать.

Например, представим ситуацию, когда мы хотим прочитать из сокета, который может иметь, а может и не иметь данных. Если данные есть, мы можем прочитать их и вернуть Poll::Ready(data), но если данных ещё нет, наша футура блокируется и не может продвинуться дальше. Если данных нет, то мы должны зарегистрировать вызов wake, чтобы он был вызван, когда данные появятся в сокете, и сообщил нашему исполнителю, что футура готова прогрессировать. Простая футура SocketRead может выглядеть следующим образом:


#![allow(unused)]
fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
}

Такая модель футур позволяет держать вместе несколько асинхронных операций без лишних промежуточных выделений памяти. Одновременный запуск нескольких футур или соединение их в цепочку может быть реализовано при помощи не выделяющей памяти машины состояний, например так:


#![allow(unused)]
fn main() {
/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
    // Each field may contain a future that should be run to completion.
    // If the future has already completed, the field is set to `None`.
    // This prevents us from polling a future after it has completed, which
    // would violate the contract of the `Future` trait.
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // Attempt to complete future `a`.
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // Attempt to complete future `b`.
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // Both futures have completed-- we can return successfully
            Poll::Ready(())
        } else {
            // One or both futures returned `Poll::Pending` and still have
            // work to do. They will call `wake()` when progress can be made.
            Poll::Pending
        }
    }
}
}

Здесь показано, как несколько футур могут быть запущены одновременно без необходимости раздельной аллокации, позволяя асинхронным программам быть более эффективными. Аналогично, несколько последовательных футур могут быть запущены одна за другой, как тут:


#![allow(unused)]
fn main() {
/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // We've completed the first future-- remove it and start on
                // the second!
                Poll::Ready(()) => self.first.take(),
                // We couldn't yet complete the first future.
                Poll::Pending => return Poll::Pending,
            };
        }
        // Now that the first future is done, attempt to complete the second.
        self.second.poll(wake)
    }
}
}

Этот пример показывает, как типаж Future может использоваться для выражения асинхронного управления потоком без необходимости множественной аллокации объектов и глубоко вложенных замыканий. Давайте оставим базовое управление потоком в стороне и поговорим о реальном типаже Future и чем он отличается от написанного нами.


#![allow(unused)]
fn main() {
trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}
}

Первое, что вы могли заметить, что наш тип self больше не &mut self, а заменён на Pin<&mut Self>. Мы поговорим о закреплении (pinning) в следующей секции, но пока что знайте, что оно позволяет нам создавать неперемещаемые футуры. Неперемещаемые объекты могут хранить указатели на собственные поля, например struct MyFut { a: i32, ptr_to_a: *const i32 }. Закрепление необходимо для async/await.

Второе, wake: fn() была изменена на &mut Context<'_>. В SimpleFuture мы использовали вызов указателя на функцию (fn()), чтобы сказать исполнителю, что футура должна быть опрошена. Однако, так как fn() имеет нулевой тип, она не может сохранить информацию о том какая футура вызвала wake.

В примере из реального мира, сложное приложение, такое как web-сервер, может иметь тысячи различных подключений все пробуждения которых должны обрабатываться отдельно. Тип Context решает это предоставляя доступ к значению типа Waker, который может быть использован для пробуждения конкретной задачи.

Вызовы задачи при помощи Waker

Обычно футуры не могут разрешиться сразу же, как для них вызвали метод poll. Когда это произойдёт, футура должна быть уверена, что её снова опросят, когда она будет готова прогрессировать. Это решается при помощи типа Waker.

Футура опрашивается как часть "задачи" каждый раз, когда происходит её опрос. Задачи - это высокоуровневые футуры, с которыми работает исполнитель.

Waker предоставляет метод wake(), который может быть использован, чтобы сказать исполнителю, что соответствующая задача должна быть пробуждена. Когда вызывается wake(), исполнитель знает, что задача, связанная с Waker, готова к выполнению, и её футура должна быть опрошена снова.

Waker так же реализует clone(), так что вы можете его копировать, где это необходимо, и хранить.

Давайте попробуем реализовать простой таймер с использованием Waker.

Применение: Создание таймера

В качестве примера, мы просто раскручиваем новый поток при создании таймера, спим в течение необходимого времени, а затем через какое-то время сообщаем о том, что заданный временной промежуток истёк.

Вот список импортов, которые нам понадобятся:


#![allow(unused)]
fn main() {
use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};
}

Начнём с определения типа футуры. Нашей футуре необходим канал связи, чтобы сообщить о том, что время таймера истекло и футура должна завершиться. В качестве канала связи между таймером и футурой мы будем использовать разделяемое значение Arc<Mutex<..>>.


#![allow(unused)]
fn main() {
pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
struct SharedState {
    /// Whether or not the sleep time has elapsed
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    waker: Option<Waker>,
}
}

Теперь давайте реализуем Future для нашей футуры!


#![allow(unused)]
fn main() {
impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
}

Просто, не так ли? Если поток установит shared_state.completed = true, мы закончили! В противном случае мы клонируем Waker для текущей задачи и сохраняем его в shared_state.waker. Так поток может разбудить задачу позже.

Важно отметить, что мы должны обновлять Waker каждый раз, когда футура опрашивается, потому что она может быть перемещена в другую задачу с другим Waker. Это может произойти когда футуры после опроса передаются между задачами.

Наконец, нам нужен API, чтобы фактически построить таймер и запустить поток:


#![allow(unused)]
fn main() {
impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}
}

Это всё, что нам нужно, чтобы построить простую футуру таймером. Теперь нам нужен исполнитель, чтобы запустить её на исполнение.

Применение: создание исполнителя

Футуры Rust'a ленивы: они ничего не будут делать, если не будут активно доводиться до завершения. Один из способов довести их до завершения - это .await внутри async функции, но это просто подталкивает проблему на один уровень вверх: кто будет запускать футуры, возвращённые из async функций верхнего уровня? Ответ в том, что нам нужен исполнитель для Future.

Исполнители берут набор футур верхнего уровня и запускают их через вызов метода poll, до тех пока они не завершатся. Как правило, исполнитель будет вызывать метод poll у футуры один раз, чтобы запустить. Когда футуры сообщают, что готовы продолжить вычисления при вызове метода wake(), они помещаются обратно в очередь и вызов poll повторяется до тех пор, пока Future не будут завершены.

В этом разделе мы напишем нашего собственного простого исполнителя, способного одновременно запускать большое количество футур верхнего уровня.

Для этого примера мы используем пакет futures, в котором определён типаж ArcWake. Данный типаж предоставляет простой способ создания Waker.

[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures = "0.3"

Дальше, мы должны в верхней части src/main.rs импортировать следующее:

use {
    futures::{
        future::{BoxFuture, FutureExt},
        task::{waker_ref, ArcWake},
    },
    std::{
        future::Future,
        sync::mpsc::{sync_channel, Receiver, SyncSender},
        sync::{Arc, Mutex},
        task::{Context, Poll},
        time::Duration,
    },
    // The timer we wrote in the previous section:
    timer_future::TimerFuture,
};

Наш исполнитель будет работать, посылая задачи для запуска по каналу. Исполнитель извлечёт события из канала и запустит их. Когда задача будет готова выполнить больше работы (будет пробуждена), она может запланировать повторный опрос самой себя, отправив себя обратно в канал.

В этом проекте самому исполнителю просто необходим получатель для канала задачи. Пользователь получит экземпляр отправителя, чтобы он мог создавать новые футуры. Сами задачи - это просто футуры, которые могут перезапланировать самих себя, поэтому мы сохраним их как сочетание футуры и отправителя, который задача может использовать, чтобы добавить себя в очередь.

/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
    /// In-progress future that should be pushed to completion.
    ///
    /// The `Mutex` is not necessary for correctness, since we only have
    /// one thread executing tasks at once. However, Rust isn't smart
    /// enough to know that `future` is only mutated from one thread,
    /// so we need to use the `Mutex` to prove thread-safety. A production
    /// executor would not need this, and could use `UnsafeCell` instead.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// Handle to place the task itself back onto the task queue.
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once.
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}

Давайте ещё добавим метод в spawner, чтобы было проще создавать новые футуры. Этот метод возьмёт тип футуры, поместит его в коробку и создаст новую задачу Arc<Task> с ним внутри. И эта задача может быть помещена в очередь для исполнителя.

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

Чтобы опросить futures, нам нужно создать Waker. Как описано в разделе задачи пробуждения, Wakers отвечают за планирование задач, которые будут опрошены снова после вызова wake. Wakers сообщают исполнителю, какая именно задача завершилась, позволяя опрашивать как раз те futures, которые готовы к продолжению выполнения. Простой способ создать новый Waker, необходимо реализовать типаж ArcWake, а затем использовать waker_ref или .into_waker() функции для преобразования Arc<impl ArcWake> в Waker. Давайте реализуем ArcWake для наших задач, чтобы они были превращены в Wakers и могли пробуждаться:

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}

Когда Waker создаётся на основе Arc<Task>, вызывая wake(), это вызовет отправку копии Arc в канал задач. Тогда нашему исполнителю нужно подобрать задание и опросить его. Давайте реализуем это:

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // Create a `LocalWaker` from the task itself
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` is a type alias for
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                // We can get a `Pin<&mut dyn Future + Send + 'static>`
                // from it by calling the `Pin::as_mut` method.
                if let Poll::Pending = future.as_mut().poll(context) {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future.
                    *future_slot = Some(future);
                }
            }
        }
    }
}

Поздравляю! Теперь у нас есть работающий исполнитель futures. Мы даже можем использовать его для запуска async/.await кода и пользовательских futures, таких как TimerFuture которую мы описали ранее:

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // Spawn a task to print before and after waiting on a timer.
    spawner.spawn(async {
        println!("howdy!");
        // Wait for our timer future to complete after two seconds.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // Drop the spawner so that our executor knows it is finished and won't
    // receive more incoming tasks to run.
    drop(spawner);

    // Run the executor until the task queue is empty.
    // This will print "howdy!", pause, and then print "done!".
    executor.run();
}

Исполнители и системный ввод/вывод

В главе "Типаж Future", мы обсуждали футуру, которая выполняет асинхронное чтение сокета:


#![allow(unused)]
fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
}

Эта футура читает из сокета доступные данные и если таковых нет, то она передаётся исполнителю с запросом активирования задачи, когда сокет снова станет читаемым. Однако, из текущего примера не ясна реализация типа Socket и, в частности, не совсем очевидно как работает функция set_readable_callback. Как мы можем сделать так, чтобы lw.wake() был вызван, когда сокет станет читаемым? Один из вариантов - иметь поток, который постоянно проверяет стал ли socket читаемым, вызывая при необходимости метод wake(). Тем не менее, такой подход будет весьма не эффективным, так как он требует отдельного потока для каждой блокирующей IO футуры. Это значительно снизит эффективность нашего асинхронного кода.

На практике эта проблема решается при помощи интеграции с IO-зависимыми системными блокирующими примитивами такими, как epoll в Linux, kqueue во FreeBSD и Mac OS, IOCP в Windows и port в Fuchsia (все они предоставляются при помощи кроссплатформенного Rust-пакета mio). Все эти примитивы позволяют потоку заблокироваться с несколькими асинхронными IO-событиями, возвращая одно из завершённых событий. На практике эти API выглядят примерно так:


#![allow(unused)]
fn main() {
struct IoBlocker {
    ...
}

struct Event {
    // ID уникально идентифицирующий событие, которое уже произошло и на которое мы подписались
    id: usize,

    // Набор сигналов, которые мы ожидаем или которые произошли
    signals: Signals,
}

impl IoBlocker {
    /// Создаём новую коллекцию асинхронных IO-событий для блокировки
    fn new() -> Self { ... }

    /// Подпишемся на определённое IO-событие.
    fn add_io_event_interest(
        &self,

        /// Объект, на котором происходит событие
        io_object: &IoObject,

        /// Набор сигналов, которые могут применяться к `io_object`,
        /// для которого должно быть инициировано событие, в паре с
        /// ID, которые передадутся событиям, получившимся в результате нашей подписки.
        event: Event,
    ) { ... }

    /// Заблокируется до появления одного из событий
    fn block(&self) -> Event { ... }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// выведет что-то похожее на "Socket 1 is now READABLE", если сокет станет доступным для чтения.
println!("Socket {:?} is now {:?}", event.id, event.signals);
}

Исполнители футур могут использовать эти примитивы для предоставления асинхронных объектов ввода-вывода, таких как сокеты, которые могут настроить обратные вызовы для запуска при определённом IO-событии. В случае нашего примера c SocketRead, функция Socket::set_readable_callback может выглядеть следующим псевдокодом:


#![allow(unused)]
fn main() {
impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` является ссылкой на локальный исполнитель.
        // Это может быть предусмотрено при создании сокета, 
        // большинство реализаций исполнителей делают это через локальный поток, так удобнее.
        let local_executor = self.local_executor;

        // Уникальный ID для объекта ввода вывода.
        let id = self.id;

        // Сохраним `waker` в данных исполнителя,
        // чтобы его можно было вызвать после того, как будет получено событие.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}
}

Теперь у нас может быть только один поток исполнителя, который может принимать и отправлять любые события ввода-вывода в нужный Waker, который разбудит соответствующую задачу, позволяющая исполнителю довести больше задач до завершения перед возвратом к проверке новых событий ввода-вывода (и цикл продолжается...).

async/await

В первой главе мы бросили беглый взгляд на async/.await и использовали их чтобы построить простой сервер. В этой главе мы обсудим async/.await более подробно, объясняя, как они работают и как async-код отличается от традиционных программ на Rust.

async/.await - это специальный синтаксис Rust, который позволяет не блокировать поток, а передавать управление другому коду, пока ожидается завершение операции.

Существует два основных способа использования async: async fn и async-блоки. Каждый возвращает значение, реализующее типаж Future:


#![allow(unused)]

fn main() {
// `foo()` returns a type that implements `Future<Output = u8>`.
// `foo().await` will result in a value of type `u8`.
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // This `async` block results in a type that implements
    // `Future<Output = u8>`.
    async {
        let x: u8 = foo().await;
        x + 5
    }
}
}

Как мы видели в первой главе, async-блоки и другие футуры ленивы: они ничего не делают, пока их не запустят. Наиболее распространённый способ запустить Future - это .await. Когда .await вызывается на Future, он пытается выполнить её до конца. Если Future заблокирована, то контроль будет передан текущему потоку. Чтобы добиться большего прогресса, будет выбрана верхняя Future исполнителя, позволяя .await продолжить работу.

Времена жизни async

В отличие от традиционных функций, async fn, которые принимают ссылки или другие не-'static аргументы, возвращают Future, которая ограничена временем жизни аргумента:


#![allow(unused)]
fn main() {
// This function:
async fn foo(x: &u8) -> u8 { *x }

// Is equivalent to this function:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}
}

Это означает, что для футуры, возвращаемая из async fn, должен быть вызван .await до тех пор, пока её не-'static аргументы все ещё действительны. В общем случае, вызов .await у футуры сразу после вызова функции (как в foo(&x).await), не является проблемой. Однако, проблемой может оказаться сохранение футуры или отправка её в другую задачу или поток.

Один общий обходной путь для включения async fn со ссылками в аргументах в 'static футуру состоит в том, чтобы связать аргументы с вызовом async fn внутри async-блока:


#![allow(unused)]
fn main() {
fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR: `x` does not live long enough
}

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}
}

Перемещая аргумент в async-блок, мы продлеваем его время жизни до времени жизни Future, которая возвращается при вызове good.

async move

async-блоки и async-замыкания позволяют использовать ключевое слово move, как и в обычном замыкании. async move блок получает владение переменными со ссылками, позволяя им пережить текущую область, но отказывая им в возможности делиться этими переменными с другим кодом:


#![allow(unused)]
fn main() {
/// `async` block:
///
/// Multiple different `async` blocks can access the same local variable
/// so long as they're executed within the variable's scope
async fn blocks() {
    let my_string = "foo".to_string();

    let future_one = async {
        // ...
        println!("{}", my_string);
    };

    let future_two = async {
        // ...
        println!("{}", my_string);
    };

    // Run both futures to completion, printing "foo" twice:
    let ((), ()) = futures::join!(future_one, future_two);
}

/// `async move` block:
///
/// Only one `async move` block can access the same captured variable, since
/// captures are moved into the `Future` generated by the `async move` block.
/// However, this allows the `Future` to outlive the original scope of the
/// variable:
fn move_block() -> impl Future<Output = ()> {
    let my_string = "foo".to_string();
    async move {
        // ...
        println!("{}", my_string);
    }
}
}

.await в многопоточном исполнителе

Обратите внимание, что при использовании Future в многопоточном исполнителе, Future может перемещаться между потоками. Поэтому любые переменные, используемые в телах async, должны иметь возможность перемещаться между потоками, так как любой .await потенциально может привести к переключению на новый поток.

Это означает, что не безопасно использовать Rc, &RefCell или любые другие типы, не реализующие типаж Send (включая ссылки на типы, которые не реализуют типаж Sync).

(Предостережение: можно использовать эти типы до тех пор, пока они не находятся в области действия вызова .await.)

Точно так же не очень хорошая идея держать традиционную non-futures-aware блокировку через .await, так как это может привести к блокировке пула потоков: одна задача может получить объект блокировки, вызвать .await и передать управление исполнителю, разрешив другой задаче совершить попытку взять блокировку, что вызовет взаимную блокировку. Чтобы избежать этого, используйте Mutex из futures::lock, а не из std::sync.

Закрепление (pinning)

Чтобы опросить футуры, они должны быть закреплены с помощью специального типа Pin<T>. Если вы прочитали объяснение Future в предыдущем разделе «Выполнение Future и задач», вы узнаете Pin из self: Pin<&mut Self> в определении метода Future::poll. Но что это значит, и зачем нам это нужно?

Для чего нужно закрепление

Закрепление даёт возможность гарантировать, что объект не будет перемещён. Чтобы понять почему это важно, нам надо помнить как работает async/.await. Рассмотрим следующий код:

let fut_one = /* ... */;
let fut_two = /* ... */;
async move {
    fut_one.await;
    fut_two.await;
}

Под капотом, он создаёт анонимный тип, который реализует типаж Future, предоставляющий метод poll, выглядящий примерно так:

// The `Future` type generated by our `async { ... }` block
struct AsyncFuture {
    fut_one: FutOne,
    fut_two: FutTwo,
    state: State,
}

// List of states our `async` block can be in
enum State {
    AwaitingFutOne,
    AwaitingFutTwo,
    Done,
}

impl Future for AsyncFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        loop {
            match self.state {
                State::AwaitingFutOne => match self.fut_one.poll(..) {
                    Poll::Ready(()) => self.state = State::AwaitingFutTwo,
                    Poll::Pending => return Poll::Pending,
                }
                State::AwaitingFutTwo => match self.fut_two.poll(..) {
                    Poll::Ready(()) => self.state = State::Done,
                    Poll::Pending => return Poll::Pending,
                }
                State::Done => return Poll::Ready(()),
            }
        }
    }
}

Когда poll вызывается первый раз, он опрашивает fut_one. Если fut_one не завершена, возвращается AsyncFuture::poll. Следующие вызовы poll будут начинаться там, где завершился предыдущий вызов. Этот процесс продолжается до тех пор, пока future не сможет завершиться.

Однако, что будет, если async блок использует ссылки? Например:

async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);
}

Во что скомпилируется эта структура?

struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // указывает на `x` далее
}

struct AsyncFuture {
     x: [u8; 128],
     read_into_buf_fut: ReadIntoBuf<'?>, // какое тут время жизни?
}

Здесь future ReadIntoBuf содержит ссылку на другое поле нашей структуры, x. Однако, если AsyncFuture будет перемещена, положение x тоже будет изменено, что инвалидирует указатель, сохранённый в read_into_buf_fut.buf.

Закрепление футур в определённом месте памяти предотвращает эту проблему, делая безопасным создание ссылок на данные за пределами async блока.

Как использовать закрепление

Тип Pin оборачивает указатель на другие типы, гарантируя, что значение за указателем не будет перемещено. Например, Pin<&mut T&gt;, Pin<&T&gt;, Pin<Box<T>&gt; - все гарантируют, что положение T останется неизменным.

У большинства типов нет проблем с перемещением. Эти типы реализуют типаж Unpin. Указатели на Unpin-типы могут свободно помещаться в Pin или извлекаться из него. Например, тип u8 реализует Unpin, таким образом Pin<&mut u8> ведёт себя также, как и &mut u8.

Некоторые функции требуют, чтобы футуры, с которыми они работают, были Unpin. Чтобы использовать Future или Stream, который не реализует Unpin, с функцией, которая требует Unpin-типы, сначала нужно закрепить значение, используя либо Box::pin (чтобы создать Pin<Box<T>>) или макрос pin_utils::pin_mut! (чтобы создать Pin<&mut T>). Pin<Box<Fut>> и Pin<&mut Fut> могут быть использованы как футура и оба реализуют Unpin.

Например:

use pin_utils::pin_mut; // `pin_utils` is a handy crate available on crates.io

// A function which takes a `Future` that implements `Unpin`.
fn execute_unpin_future(x: impl Future<Output = ()> + Unpin) { /* ... */ }

let fut = async { /* ... */ };
execute_unpin_future(fut); // Error: `fut` does not implement `Unpin` trait

// Pinning with `Box`:
let fut = async { /* ... */ };
let fut = Box::pin(fut);
execute_unpin_future(fut); // OK

// Pinning with `pin_mut!`:
let fut = async { /* ... */ };
pin_mut!(fut);
execute_unpin_future(fut); // OK

Типаж Stream

Типаж Stream похож на Future, но до своего завершения может давать несколько значений. Также он похож на типаж Iterator из стандартной библиотеки:


#![allow(unused)]
fn main() {
trait Stream {
    /// The type of the value yielded by the stream.
    type Item;

    /// Attempt to resolve the next item in the stream.
    /// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
    /// is ready, and `Poll::Ready(None)` if the stream has completed.
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}
}

Одним из распространённых примеров Stream является Receiver для типа канала из пакета futures. Он даёт Some(val) каждый раз, когда значение отправляется от Sender, и даст None после того, как Sender был удалён из памяти и все ожидающие сообщения были получены:


#![allow(unused)]
fn main() {
async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` is similar to `Iterator::next`, but returns a
    // type that implements `Future<Output = Option<T>>`.
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}
}

Итерирование и параллелизм

Подобно синхронным итераторам, существует множество различных способов итерации и обработки значений в Stream. Существуют методы комбинаторного стиля например, map, filter и fold и их братьев раннего-выхода-из-за-ошибки try_map, try_filter и try_fold.

К сожалению, цикл for не может использоваться для Stream, но для императивного стиля написания кода, могут быть использованы while let и функции next/try_next:


#![allow(unused)]
fn main() {
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}
}

Однако, если мы просто обрабатываем один элемент за раз, мы потенциально оставляем возможность для параллелизма, который, в конце концов, стоит на первом месте при написании асинхронного кода. Для обработки нескольких элементов из потока одновременно, используйте методы for_each_concurrent и try_for_each_concurrent:


#![allow(unused)]
fn main() {
async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}
}

Одновременное выполнение нескольких Future

До этого времени, мы в основном выполняли футуры используя .await, который блокирует текущую задачу до тех пор, пока не завершится отдельная Future. Однако, настоящие асинхронные приложения чаще всего должны выполнять несколько различных операций одновременно.

В этой главе мы рассмотрим несколько вариантов одновременного выполнения нескольких асинхронных операций:

  • join!: ждёт завершения всех футур
  • select!: ждёт завершения одной из футур
  • Spawning: создаёт высокоуровневые задачи, которые запускают содержащиеся в них футуры до их завершения
  • FuturesUnordered: группа футур, которые отдают результат каждой субфутуры

join!

Макрос futures::join позволяет дождаться завершения нескольких разных футур при одновременном их выполнении.

join!

При выполнении нескольких асинхронных операций возникает соблазн просто последовательно вызвать несколько .await:


#![allow(unused)]
fn main() {
async fn get_book_and_music() -> (Book, Music) {
    let book = get_book().await;
    let music = get_music().await;
    (book, music)
}
}

Однако это будет медленнее, чем необходимо, так как он не начнёт пытаться выполнять get_music до завершения get_book. В некоторых других языках, футуры выполняются до завершения, поэтому две операции могут быть запущены одновременно сначала вызовом каждой async fn, для запуска футур, а потом их ожиданием:


#![allow(unused)]
fn main() {
// WRONG -- don't do this
async fn get_book_and_music() -> (Book, Music) {
    let book_future = get_book();
    let music_future = get_music();
    (book_future.await, music_future.await)
}
}

Однако футуры на Rust не будут работать, пока для них не будет вызван .await. Это означает, что оба приведённых выше фрагмента кода запустят book_future и music_future последовательно, вместо того, чтобы запустить их параллельно. Чтобы правильно распараллелить их выполнение, используйте futures::join!:


#![allow(unused)]
fn main() {
use futures::join;

async fn get_book_and_music() -> (Book, Music) {
    let book_fut = get_book();
    let music_fut = get_music();
    join!(book_fut, music_fut)
}
}

Значение, возвращаемое join! - это кортеж, содержащий выходные данные каждой из переданных Future.

try_join!

Для футур, которые возвращают Result, может использоваться try_join!, а не join!. Так как join! завершается только после завершения всех подфутур, он будет продолжать обрабатывать другие футуры даже после того, как одна из подфутур вернёт Err.

В отличие отjoin!, try_join! завершится немедленно, если какая-либо из подфутур вернёт ошибку.


#![allow(unused)]
fn main() {
use futures::try_join;

async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book();
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}
}

Обратите внимание, что все футуры, переданные в try_join!, должны иметь один и тот же тип ошибки. Рассмотрите возможность использования функций .map_err(|e| ...) и .err_into() из futures::future::TryFutureExt для приведения типов ошибок к единому виду:


#![allow(unused)]
fn main() {
use futures::{
    future::TryFutureExt,
    try_join,
};

async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}
}

select!

Макрос futures::select запускает несколько футур одновременно и передаёт управление пользователю, как только любая из футур завершится.


#![allow(unused)]
fn main() {
use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
}

Функция выше запустит обе t1 и t2 параллельно. Когда t1 или t2 закончится, соответствующий дескриптор вызовет println! и функция завершится без выполнения оставшейся задачи.

Базовый синтаксис для select: <pattern> = <expression> => <code>,, повторяемый столько раз, из скольких футур вам надо сделать select.

default => ... и complete => ...

Также select поддерживает ветки default и complete.

Ветка default выполнится, если ни одна из футур, переданных в select, не завершится. Поэтому select с веткой default всегда будет незамедлительно завершаться, так как default будет запущен, когда ещё ни одна футура не готова.

Ветка complete может быть использована для обработки случая, когда все футуры, бывшие в select, завершились и уже не могут прогрессировать. Это бывает удобно, когда select! используется в цикле.


#![allow(unused)]
fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}
}

Взаимодействие с Unpin и FusedFuture

Одна вещь, на которую вы могли обратить внимание в первом примере, это то, что мы вызвали .fuse() для футур, возвращённых из двух async fn, а потом закрепили их с помощью pin_mut. Оба этих вызова важны, потому что футуры, используемые в select, должны реализовывать и типаж Unpin, и типаж FusedFuture.

Unpin важен по той причине, что футуры, используемые в select, берутся не по значению, а по изменяемой ссылке. Так как владение футурами никому не передано, незавершённые футуры могут быть снова использованы после вызова select.

Аналогично, типаж FusedFuture необходим, так как select не должен опрашивать футуры после их завершения. FusedFuture реализуется футурами, которые отслеживают, завершены ли они или нет. Это делает возможным использование select в цикле, опрашивая только те футуры, которые до сих пор не завершились. Это можно увидеть в примере выше, где a_fut или b_fut будут завершены во второй раз за цикл. Так как футура, возвращённая future::ready, реализует FusedFuture, она может сообщить select, что её не надо снова опросить.

Заметьте, что у стримов есть соответствующий типаж FusedStream. Стримы, реализующие этот типаж или имеющие обёртку, созданную .fuse(), возвращают FusedFuture из комбинаторов .next() и .try_next().


#![allow(unused)]
fn main() {
use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}
}

Распараллеливание задач в цикле с select с помощью Fuse и FuturesUnordered

Одна довольно труднодоступная, но удобная функция - Fuse::terminated(), которая позволяет создавать уже прекращённые пустые футуры, которые в последствии могут быть заполнены другой футурой, которую надо запустить.

Это может быть удобно, когда есть задача, которую надо запустить в цикле в select, но которая создана вне этого цикла.

Обратите внимание на функцию .select_next_some(). Она может использоваться с select для запуска полученных из стрима тех ветвей, которые имеют значение Some(_), а не None.


#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
}

Когда надо одновременно запустить много копий какой-либо футуры, используйте тип FuturesUnordered. Следующий пример похож на один из тех, что был выше, но здесь мы дожидаемся завершения каждой выполненной копии run_on_new_num_fut, а не останавливаем её при создании новой. Он также отобразит значение, возвращённое run_on_new_num_fut.


#![allow(unused)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

}

Обходные пути, которые мы понимаем и любим

Поддержка async в Rust всё ещё довольно нова и некоторые востребованные функции активно разрабатываются, а некоторые диагностики до сих пор не полноценны. В этой главе обсуждаются некоторые болевые точки и объясняется как с ними работать.

? в async блоках

Как и в async fn, ? также может использоваться внутри async блоков. Однако возвращаемый тип async блоков явно не указывается. Это может привести тому, что компилятор не сможет определить тип ошибки async блока.

Например, этот код:


#![allow(unused)]
fn main() {
let fut = async {
    foo().await?;
    bar().await?;
    Ok(())
};
}

вызовет ошибку:

error[E0282]: type annotations needed
 --> src/main.rs:5:9
  |
4 |     let fut = async {
  |         --- consider giving `fut` a type
5 |         foo().await?;
  |         ^^^^^^^^^^^^ cannot infer type

К сожалению, сейчас не способа "задать тип для fut" кроме как явно указать возвращаемый тип async блока. Для обработки этого, используйте "turbofish" оператор для предоставления типов ошибки и успеха async блока:


#![allow(unused)]
fn main() {
let fut = async {
    foo().await?;
    bar().await?;
    Ok::<(), MyError>(()) // <- обратите внимание на явное указание типа
};
}

Send Approximation

Некоторые машины состояний асинхронных функций безопасны для передачи между потокам, в то время как другие - нет. Так или иначе, async fn Future является Send если тип, содержащийся в .await, тоже Send. Компилятор делает всё возможное, чтобы при близиться к моменту, когда значения могут удерживаться в .await, но сейчас в некоторых местах этот анализ слишком консервативен.

Например, рассмотрим простой не-Send тип, например, содержащий Rc:


#![allow(unused)]
fn main() {
use std::rc::Rc;

#[derive(Default)]
struct NotSend(Rc<()>);
}

Переменные типа NotSend могут появляться как временные внутри async fn даже когда тип Future, возвращаемой из async fn должен быть Send:

async fn bar() {}
async fn foo() {
    NotSend::default();
    bar().await;
}

fn require_send(_: impl Send) {}

fn main() {
    require_send(foo());
}

Но если мы изменим foo таким образом, что она будет хранить NotSend в переменной, пример не скомпилируется:


#![allow(unused)]
fn main() {
async fn foo() {
    let x = NotSend::default();
    bar().await;
}
}
error[E0277]: `std::rc::Rc<()>` cannot be sent between threads safely
  --> src/main.rs:15:5
   |
15 |     require_send(foo());
   |     ^^^^^^^^^^^^ `std::rc::Rc<()>` cannot be sent between threads safely
   |
   = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<()>`
   = note: required because it appears within the type `NotSend`
   = note: required because it appears within the type `{NotSend, impl std::future::Future, ()}`
   = note: required because it appears within the type `[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]`
   = note: required because it appears within the type `std::future::GenFuture<[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]>`
   = note: required because it appears within the type `impl std::future::Future`
   = note: required because it appears within the type `impl std::future::Future`
note: required by `require_send`
  --> src/main.rs:12:1
   |
12 | fn require_send(_: impl Send) {}
   | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0277`.

Эта ошибка корректна. Если мы сохраним x в переменную, она не будет удалена пока не будет завершён .await. В этот момент async fn может быть запущена в другом потоке. Так как Rc не является Send, перемещение между потоками будет некорректным. Простым решением будет вызов drop у Rc до вызова .await, но к сожалению пока что это не работает.

Для того, чтобы успешно обойти эту проблему, вы можете создать блок, инкапсулирующий любые не-Send переменные. С помощью этого, компилятору будет проще понять, что такие переменные не переживут момент вызова .await.


#![allow(unused)]
fn main() {
async fn foo() {
    {
        let x = NotSend::default();
    }
    bar().await;
}
}

Рекурсия

Под капотом, async fn создаёт тип конечного автомата, содержащий каждую подфутуру для который был вызван .await. Из-за этого рекурсивные async fn становятся более сложными, так как итоговый конечный автомат содержит сам себя:


#![allow(unused)]
fn main() {
// Эта функция:
async fn foo() {
    step_one().await;
    step_two().await;
}
// создаёт типы, подобные следующим:
enum Foo {
    First(StepOne),
    Second(StepTwo),
}

// А эта функция:
async fn recursive() {
    recursive().await;
    recursive().await;
}

// создаёт такие типы:
enum Recursive {
    First(Recursive),
    Second(Recursive),
}
}

Это не будет работать - мы создали тип бесконечного размера! Компилятор будет жаловаться:

error[E0733]: recursion in an `async fn` requires boxing
 --> src/lib.rs:1:22
  |
1 | async fn recursive() {
  |                      ^ an `async fn` cannot invoke itself directly
  |
  = note: a recursive `async fn` must be rewritten to return a boxed future.

Чтобы исправить это, мы должны ввести косвенность при помощи Box. К сожалению, из-за ограничений компилятора, обернуть вызов recursive() в Box::pin не достаточно. Чтобы это заработало, мы должны сделать recursive не асинхронной функцией, которая возвращает .boxed() с async блоком:


#![allow(unused)]
fn main() {
use futures::future::{BoxFuture, FutureExt};

fn recursive() -> BoxFuture<'static, ()> {
    async move {
        recursive().await;
        recursive().await;
    }.boxed()
}
}

async в типажах

В настоящий момент async fn не могут использоваться в типажах. Причиной является большая сложность, но снятие этого ограничения находится в планах на будущее.

Однако вы можете обойти это ограничение при помощи пакета async_trait с crates.io.

Заметьте, что использование этих методов типажей приведёт к выделениям памяти в куче для каждого вызова функции. Это не значительная стоимость для большинства приложений, но следует это учитывать при принятии решения использовать данную функциональность в публичном API низкоуровневых функций, которые будут вызываться миллион раз в секунду.

The Async Ecosystem

Rust currently provides only the bare essentials for writing async code. Importantly, executors, tasks, reactors, combinators, and low-level I/O futures and traits are not yet provided in the standard library. In the meantime, community-provided async ecosystems fill in these gaps.

Async Runtimes

Async runtimes are libraries used for executing async applications. Runtimes usually bundle together a reactor with one or more executors. Reactors provide subscription mechanisms for external events, like async I/O, interprocess communication, and timers. In an async runtime, subscribers are typically futures representing low-level I/O operations. Executors handle the scheduling and execution of tasks. They keep track of running and suspended tasks, poll futures to completion, and wake tasks when they can make progress. The word "executor" is frequently used interchangeably with "runtime". Here, we use the word "ecosystem" to describe a runtime bundled with compatible traits and features.

Community-Provided Async Crates

The Futures Crate

The futures crate contains traits and functions useful for writing async code. This includes the Stream, Sink, AsyncRead, and AsyncWrite traits, and utilities such as combinators. These utilities and traits may eventually become part of the standard library.

futures has its own executor, but not its own reactor, so it does not support execution of async I/O or timer futures. For this reason, it's not considered a full runtime. A common choice is to use utilities from futures with an executor from another crate.

There is no asynchronous runtime in the standard library, and none are officially recommended. The following crates provide popular runtimes.

  • Tokio: A popular async ecosystem with HTTP, gRPC, and tracing frameworks.
  • async-std: A crate that provides asynchronous counterparts to standard library components.
  • smol: A small, simplified async runtime. Provides the Async trait that can be used to wrap structs like UnixStream or TcpListener.
  • fuchsia-async: An executor for use in the Fuchsia OS.

Determining Ecosystem Compatibility

Not all async applications, frameworks, and libraries are compatible with each other, or with every OS or platform. Most async code can be used with any ecosystem, but some frameworks and libraries require the use of a specific ecosystem. Ecosystem constraints are not always documented, but there are several rules of thumb to determine whether a library, trait, or function depends on a specific ecosystem.

Any async code that interacts with async I/O, timers, interprocess communication, or tasks generally depends on a specific async executor or reactor. All other async code, such as async expressions, combinators, synchronization types, and streams are usually ecosystem independent, provided that any nested futures are also ecosystem independent. Before beginning a project, it's recommended to research relevant async frameworks and libraries to ensure compatibility with your chosen runtime and with each other.

Notably, Tokio uses the mio reactor and defines its own versions of async I/O traits, including AsyncRead and AsyncWrite. On its own, it's not compatible with async-std and smol, which rely on the async-executor crate, and the AsyncRead and AsyncWrite traits defined in futures.

Conflicting runtime requirements can sometimes be resolved by compatibility layers that allow you to call code written for one runtime within another. For example, the async_compat crate provides a compatibility layer between Tokio and other runtimes.

Libraries exposing async APIs should not depend on a specific executor or reactor, unless they need to spawn tasks or define their own async I/O or timer futures. Ideally, only binaries should be responsible for scheduling and running tasks.

Single Threaded vs Multi-Threaded Executors

Async executors can be single-threaded or multi-threaded. For example, the async-executor crate has both a single-threaded LocalExecutor and a multi-threaded Executor.

A multi-threaded executor makes progress on several tasks simultaneously. It can speed up the execution greatly for workloads with many tasks, but synchronizing data between tasks is usually more expensive. It is recommended to measure performance for your application when you are choosing between a single- and a multi-threaded runtime.

Tasks can either be run on the thread that created them or on a separate thread. Async runtimes often provide functionality for spawning tasks onto separate threads. Even if tasks are executed on separate threads, they should still be non-blocking. In order to schedule tasks on a multi-threaded executor, they must also be Send. Some runtimes provide functions for spawning non-Send tasks, which ensures every task is executed on the thread that spawned it. They may also provide functions for spawning blocking tasks onto dedicated threads, which is useful for running blocking synchronous code from other libraries.

Final Project: Building a Concurrent Web Server with Async Rust

In this chapter, we'll use asynchronous Rust to modify the Rust book's single-threaded web server to serve requests concurrently.

Recap

Here's what the code looked like at the end of the lesson.

src/main.rs:

use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    // Listen for incoming TCP connections on localhost port 7878
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    // Block forever, handling each request that arrives at this IP address
    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // Read the first 1024 bytes of data from the stream
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    // Respond with greetings or a 404,
    // depending on the data in the request
    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };
    let contents = fs::read_to_string(filename).unwrap();

    // Write response back to the stream,
    // and flush the stream to ensure the response is sent back to the client
    let response = format!("{}{}", status_line, contents);
    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

hello.html:

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Hello!</title>
  </head>
  <body>
    <h1>Hello!</h1>
    <p>Hi from Rust</p>
  </body>
</html>

404.html:

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Hello!</title>
  </head>
  <body>
    <h1>Oops!</h1>
    <p>Sorry, I don't know what you're asking for.</p>
  </body>
</html>

If you run the server with cargo run and visit 127.0.0.1:7878 in your browser, you'll be greeted with a friendly message from Ferris!

Running Asynchronous Code

An HTTP server should be able to serve multiple clients concurrently; that is, it should not wait for previous requests to complete before handling the current request. The book solves this problem by creating a thread pool where each connection is handled on its own thread. Here, instead of improving throughput by adding threads, we'll achieve the same effect using asynchronous code.

Let's modify handle_connection to return a future by declaring it an async fn:

async fn handle_connection(mut stream: TcpStream) {
    //<-- snip -->
}

Adding async to the function declaration changes its return type from the unit type () to a type that implements Future<Output=()>.

If we try to compile this, the compiler warns us that it will not work:

$ cargo check
    Checking async-rust v0.1.0 (file:///projects/async-rust)
warning: unused implementer of `std::future::Future` that must be used
  --> src/main.rs:12:9
   |
12 |         handle_connection(stream);
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: `#[warn(unused_must_use)]` on by default
   = note: futures do nothing unless you `.await` or poll them

Because we haven't awaited or polled the result of handle_connection, it'll never run. If you run the server and visit 127.0.0.1:7878 in a browser, you'll see that the connection is refused; our server is not handling requests.

We can't await or poll futures within synchronous code by itself. We'll need an asynchronous runtime to handle scheduling and running futures to completion. Please consult the section on choosing a runtime for more information on asynchronous runtimes, executors, and reactors.

Adding an Async Runtime

Here, we'll use an executor from the async-std crate. The #[async_std::main] attribute from async-std allows us to write an asynchronous main function. To use it, enable the attributes feature of async-std in Cargo.toml:

[dependencies.async-std]
version = "1.6"
features = ["attributes"]

As a first step, we'll switch to an asynchronous main function, and await the future returned by the async version of handle_connection. Then, we'll test how the server responds. Here's what that would look like:

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in listener.incoming() {
        let stream = stream.unwrap();
        // Warning: This is not concurrent!
        handle_connection(stream).await;
    }
}

Now, let's test to see if our server can handle connections concurrently. Simply making handle_connection asynchronous doesn't mean that the server can handle multiple connections at the same time, and we'll soon see why.

To illustrate this, let's simulate a slow request. When a client makes a request to 127.0.0.1:7878/sleep, our server will sleep for 5 seconds:

use async_std::task;

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        task::sleep(Duration::from_secs(5)).await;
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };
    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{}{}", status_line, contents);
    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

This is very similar to the simulation of a slow request from the Book, but with one important difference: we're using the non-blocking function async_std::task::sleep instead of the blocking function std::thread::sleep. It's important to remember that even if a piece of code is run within an async fn and awaited, it may still block. To test whether our server handles connections concurrently, we'll need to ensure that handle_connection is non-blocking.

If you run the server, you'll see that a request to 127.0.0.1:7878/sleep will block any other incoming requests for 5 seconds! This is because there are no other concurrent tasks that can make progress while we are awaiting the result of handle_connection. In the next section, we'll see how to use async code to handle connections concurrently.

Handling Connections Concurrently

The problem with our code so far is that listener.incoming() is a blocking iterator. The executor can't run other futures while listener waits on incoming connections, and we can't handle a new connection until we're done with the previous one.

In order to fix this, we'll transform listener.incoming() from a blocking Iterator to a non-blocking Stream. Streams are similar to Iterators, but can be consumed asynchronously. For more information, see the chapter on Streams.

Let's replace our blocking std::net::TcpListener with the non-blocking async_std::net::TcpListener, and update our connection handler to accept an async_std::net::TcpStream:

use async_std::prelude::*;

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    //<-- snip -->
    stream.write(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

The asynchronous version of TcpListener implements the Stream trait for listener.incoming(), a change which provides two benefits. The first is that listener.incoming() no longer blocks the executor. The executor can now yield to other pending futures while there are no incoming TCP connections to be processed.

The second benefit is that elements from the Stream can optionally be processed concurrently, using a Stream's for_each_concurrent method. Here, we'll take advantage of this method to handle each incoming request concurrently. We'll need to import the Stream trait from the futures crate, so our Cargo.toml now looks like this:

+[dependencies]
+futures = "0.3"

 [dependencies.async-std]
 version = "1.6"
 features = ["attributes"]

Now, we can handle each connection concurrently by passing handle_connection in through a closure function. The closure function takes ownership of each TcpStream, and is run as soon as a new TcpStream becomes available. As long as handle_connection does not block, a slow request will no longer prevent other requests from completing.

use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    listener
        .incoming()
        .for_each_concurrent(/* limit */ None, |tcpstream| async move {
            let tcpstream = tcpstream.unwrap();
            handle_connection(tcpstream).await;
        })
        .await;
}

Serving Requests in Parallel

Our example so far has largely presented concurrency (using async code) as an alternative to parallelism (using threads). However, async code and threads are not mutually exclusive. In our example, for_each_concurrent processes each connection concurrently, but on the same thread. The async-std crate allows us to spawn tasks onto separate threads as well. Because handle_connection is both Send and non-blocking, it's safe to use with async_std::task::spawn. Here's what that would look like:

use async_std::task::spawn;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    listener
        .incoming()
        .for_each_concurrent(/* limit */ None, |stream| async move {
            let stream = stream.unwrap();
            spawn(handle_connection(stream));
        })
        .await;
}

Now we are using both concurrency and parallelism to handle multiple requests at the same time! See the section on multithreaded executors for more information.

Testing the TCP Server

Let's move on to testing our handle_connection function.

First, we need a TcpStream to work with. In an end-to-end or integration test, we might want to make a real TCP connection to test our code. One strategy for doing this is to start a listener on localhost port 0. Port 0 isn't a valid UNIX port, but it'll work for testing. The operating system will pick an open TCP port for us.

Instead, in this example we'll write a unit test for the connection handler, to check that the correct responses are returned for the respective inputs. To keep our unit test isolated and deterministic, we'll replace the TcpStream with a mock.

First, we'll change the signature of handle_connection to make it easier to test. handle_connection doesn't actually require an async_std::net::TcpStream; it requires any struct that implements async_std::io::Read, async_std::io::Write, and marker::Unpin. Changing the type signature to reflect this allows us to pass a mock for testing.

use std::marker::Unpin;
use async_std::io::{Read, Write};

async fn handle_connection(mut stream: impl Read + Write + Unpin) {

Next, let's build a mock TcpStream that implements these traits. First, let's implement the Read trait, with one method, poll_read. Our mock TcpStream will contain some data that is copied into the read buffer, and we'll return Poll::Ready to signify that the read is complete.

    use super::*;
    use futures::io::Error;
    use futures::task::{Context, Poll};

    use std::cmp::min;
    use std::pin::Pin;

    struct MockTcpStream {
        read_data: Vec<u8>,
        write_data: Vec<u8>,
    }

    impl Read for MockTcpStream {
        fn poll_read(
            self: Pin<&mut Self>,
            _: &mut Context,
            buf: &mut [u8],
        ) -> Poll<Result<usize, Error>> {
            let size: usize = min(self.read_data.len(), buf.len());
            buf[..size].copy_from_slice(&self.read_data[..size]);
            Poll::Ready(Ok(size))
        }
    }

Our implementation of Write is very similar, although we'll need to write three methods: poll_write, poll_flush, and poll_close. poll_write will copy any input data into the mock TcpStream, and return Poll::Ready when complete. No work needs to be done to flush or close the mock TcpStream, so poll_flush and poll_close can just return Poll::Ready.

    impl Write for MockTcpStream {
        fn poll_write(
            mut self: Pin<&mut Self>,
            _: &mut Context,
            buf: &[u8],
        ) -> Poll<Result<usize, Error>> {
            self.write_data = Vec::from(buf);
            return Poll::Ready(Ok(buf.len()));
        }
        fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
            Poll::Ready(Ok(()))
        }
        fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
            Poll::Ready(Ok(()))
        }
    }

Lastly, our mock will need to implement Unpin, signifying that its location in memory can safely be moved. For more information on pinning and the Unpin trait, see the section on pinning.

    use std::marker::Unpin;
    impl Unpin for MockTcpStream {}

Now we're ready to test the handle_connection function. After setting up the MockTcpStream containing some initial data, we can run handle_connection using the attribute #[async_std::test], similarly to how we used #[async_std::main]. To ensure that handle_connection works as intended, we'll check that the correct data was written to the MockTcpStream based on its initial contents.

    use std::fs;

    #[async_std::test]
    async fn test_handle_connection() {
        let input_bytes = b"GET / HTTP/1.1\r\n";
        let mut contents = vec![0u8; 1024];
        contents[..input_bytes.len()].clone_from_slice(input_bytes);
        let mut stream = MockTcpStream {
            read_data: contents,
            write_data: Vec::new(),
        };

        handle_connection(&mut stream).await;
        let mut buf = [0u8; 1024];
        stream.read(&mut buf).await.unwrap();

        let expected_contents = fs::read_to_string("hello.html").unwrap();
        let expected_response = format!("HTTP/1.1 200 OK\r\n\r\n{}", expected_contents);
        assert!(stream.write_data.starts_with(expected_response.as_bytes()));
    }