Вызовы задачи при помощи Waker

Обычно футуры не могут разрешиться сразу же, как для них вызвали метод poll. Когда это произойдёт, футура должна быть уверена, что её снова опросят, когда она будет готова прогрессировать. Это решается при помощи типа Waker.

Футура опрашивается как часть "задачи" каждый раз, когда происходит её опрос. Задачи - это высокоуровневые футуры, с которыми работает исполнитель.

Waker предоставляет метод wake(), который может быть использован, чтобы сказать исполнителю, что соответствующая задача должна быть пробуждена. Когда вызывается wake(), исполнитель знает, что задача, связанная с Waker, готова к выполнению, и её футура должна быть опрошена снова.

Waker так же реализует clone(), так что вы можете его копировать, где это необходимо, и хранить.

Давайте попробуем реализовать простой таймер с использованием Waker.

Применение: Создание таймера

В качестве примера, мы просто раскручиваем новый поток при создании таймера, спим в течение необходимого времени, а затем через какое-то время сообщаем о том, что заданный временной промежуток истёк.

Во-первых, запустите новый проект с cargo new --lib timer_future и добавьте импорт, который нам понадобится для начала работы, в src/lib.rs:


#![allow(unused)]
fn main() {
use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};
}

Начнём с определения типа футуры. Нашей футуре необходим канал связи, чтобы сообщить о том, что время таймера истекло и футура должна завершиться. В качестве канала связи между таймером и футурой мы будем использовать разделяемое значение Arc<Mutex<..>>.

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
struct SharedState {
    /// Whether or not the sleep time has elapsed
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    waker: Option<Waker>,
}

Теперь давайте реализуем Future для нашей футуры!

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

Просто, не так ли? Если поток установит shared_state.completed = true, мы закончили! В противном случае мы клонируем Waker для текущей задачи и сохраняем его в shared_state.waker. Так поток может разбудить задачу позже.

Важно отметить, что мы должны обновлять Waker каждый раз, когда футура опрашивается, потому что она может быть перемещена в другую задачу с другим Waker. Это может произойти когда футуры после опроса передаются между задачами.

Наконец, нам нужен API, чтобы фактически построить таймер и запустить поток:

impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

Это всё, что нам нужно, чтобы построить простую футуру таймером. Теперь нам нужен исполнитель, чтобы запустить её на исполнение.