Типаж Future

Типаж Future является центральным для асинхронного программирования в Rust. Future - это асинхронное вычисление, которое может производить значение (хотя значение может быть и пустым, например ()). Упрощённый вариант этого типажа может выглядеть как-то так:


#![allow(unused_variables)]
fn main() {
trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
}

Футуры могут прогрессировать при помощи функции poll, которая продвигает их к завершению на сколько это возможно. Если футура завершается, она возвращает Poll::Ready(result). Если же она всё ещё не готова завершиться, то - Poll::Pending и обрабатывает функцию wake() таким образом, что она будет вызвана, когда Future будет готова прогрессировать. Когда wake() вызван, исполнитель снова вызывает у Future метод poll, чтобы она смогла продвинуться далее.

Без wake(), исполнитель не имеет возможности узнать, когда какая-либо футура может продвинуться, и ему необходимо постоянно опрашивать каждую футуру. С wake() он точно знает какие футуры готовы прогрессировать.

Например, представим ситуацию, когда мы хотим прочитать из сокета, который может иметь, а может и не иметь данных. Если данные есть, мы можем прочитать их и вернуть Poll::Ready(data), но если данных ещё нет, наша футура блокируется и не может продвинуться дальше. Если данных нет, то мы должны зарегистрировать вызов wake, чтобы он был вызван, когда данные появятся в сокете, и сообщил нашему исполнителю, что футура готова прогрессировать. Простая футура SocketRead может выглядеть следующим образом:


#![allow(unused_variables)]
fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
}

Такая модель футур позволяет держать вместе несколько асинхронных операций без лишних промежуточных выделений памяти. Одновременный запуск нескольких футур или соединение их в цепочку может быть реализовано при помощи не выделяющей памяти машины состояний, например так:


#![allow(unused_variables)]
fn main() {
/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
    // Each field may contain a future that should be run to completion.
    // If the future has already completed, the field is set to `None`.
    // This prevents us from polling a future after it has completed, which
    // would violate the contract of the `Future` trait.
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // Attempt to complete future `a`.
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // Attempt to complete future `b`.
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // Both futures have completed-- we can return successfully
            Poll::Ready(())
        } else {
            // One or both futures returned `Poll::Pending` and still have
            // work to do. They will call `wake()` when progress can be made.
            Poll::Pending
        }
    }
}
}

Здесь показано, как несколько футур могут быть запущены одновременно без необходимости раздельной аллокации, позволяя асинхронным программам быть более эффективными. Аналогично, несколько последовательных футур могут быть запущены одна за другой, как тут:


#![allow(unused_variables)]
fn main() {
/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // We've completed the first future-- remove it and start on
                // the second!
                Poll::Ready(()) => self.first.take(),
                // We couldn't yet complete the first future.
                Poll::Pending => return Poll::Pending,
            };
        }
        // Now that the first future is done, attempt to complete the second.
        self.second.poll(wake)
    }
}
}

Этот пример показывает, как типаж Future может использоваться для выражения асинхронного управления потоком без необходимости множественной аллокации объектов и глубоко вложенных замыканий. Давайте оставим базовое управление потоком в стороне и поговорим о реальном типаже Future и чем он отличается от написанного нами.


#![allow(unused_variables)]
fn main() {
trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}
}

Первое, что вы могли заметить, что наш тип self больше не &mut self, а заменён на Pin<&mut Self>. Мы поговорим о закреплении (pinning) в следующей секции, но пока что знайте, что оно позволяет нам создавать неперемещаемые футуры. Неперемещаемые объекты могут хранить указатели на собственные поля, например struct MyFut { a: i32, ptr_to_a: *const i32 }. Закрепление необходимо для async/await.

Второе, wake: fn() была изменена на &mut Context<'_>. В SimpleFuture мы использовали вызов указателя на функцию (fn()), чтобы сказать исполнителю, что футура должна быть опрошена. Однако, так как fn() имеет нулевой тип, она не может сохранить информацию о том какая футура вызвала wake.

В примере из реального мира, сложное приложение, такое как web-сервер, может иметь тысячи различных подключений все пробуждения которых должны обрабатываться отдельно. Тип Context решает это предоставляя доступ к значению типа Waker, который может быть использован для пробуждения конкретной задачи.