Вызовы задачи при помощи Waker
Обычно футуры не могут разрешиться сразу же, как для них
вызвали метод poll
. Когда это произойдёт, футура
должна быть уверена, что её снова опросят, когда она будет готова
прогрессировать. Это решается при помощи типа Waker
.
Футура опрашивается как часть "задачи" каждый раз, когда происходит её опрос. Задачи - это высокоуровневые футуры, с которыми работает исполнитель.
Waker
предоставляет метод wake()
,
который может быть использован, чтобы сказать исполнителю, что
соответствующая задача должна быть пробуждена. Когда
вызывается wake()
, исполнитель
знает, что задача, связанная с Waker
, готова к
выполнению, и её футура должна быть опрошена снова.
Waker
так же реализует clone()
, так
что вы можете его копировать, где это необходимо, и хранить.
Давайте попробуем реализовать простой таймер с использованием Waker
.
Применение: Создание таймера
В качестве примера, мы просто раскручиваем новый поток при создании таймера, спим в течение необходимого времени, а затем через какое-то время сообщаем о том, что заданный временной промежуток истёк.
Вот список импортов, которые нам понадобятся:
#![allow(unused)] fn main() { use std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, }; }
Начнём с определения типа футуры. Нашей футуре необходим
канал связи, чтобы сообщить о том, что время таймера истекло и
футура должна завершиться. В качестве канала связи между
таймером и футурой мы будем использовать разделяемое
значение Arc<Mutex<..>>
.
#![allow(unused)] fn main() { pub struct TimerFuture { shared_state: Arc<Mutex<SharedState>>, } /// Shared state between the future and the waiting thread struct SharedState { /// Whether or not the sleep time has elapsed completed: bool, /// The waker for the task that `TimerFuture` is running on. /// The thread can use this after setting `completed = true` to tell /// `TimerFuture`'s task to wake up, see that `completed = true`, and /// move forward. waker: Option<Waker>, } }
Теперь давайте реализуем Future
для нашей футуры!
#![allow(unused)] fn main() { impl Future for TimerFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Look at the shared state to see if the timer has already completed. let mut shared_state = self.shared_state.lock().unwrap(); if shared_state.completed { Poll::Ready(()) } else { // Set waker so that the thread can wake up the current task // when the timer has completed, ensuring that the future is polled // again and sees that `completed = true`. // // It's tempting to do this once rather than repeatedly cloning // the waker each time. However, the `TimerFuture` can move between // tasks on the executor, which could cause a stale waker pointing // to the wrong task, preventing `TimerFuture` from waking up // correctly. // // N.B. it's possible to check for this using the `Waker::will_wake` // function, but we omit that here to keep things simple. shared_state.waker = Some(cx.waker().clone()); Poll::Pending } } } }
Просто, не так ли? Если поток установит shared_state.completed = true
, мы закончили!
В противном случае мы клонируем Waker
для
текущей задачи и сохраняем его в shared_state.waker
.
Так поток может разбудить задачу позже.
Важно отметить, что мы должны обновлять Waker
каждый раз, когда футура опрашивается, потому что она может
быть перемещена в другую задачу с другим Waker
.
Это может произойти когда футуры после опроса передаются
между задачами.
Наконец, нам нужен API, чтобы фактически построить таймер и запустить поток:
#![allow(unused)] fn main() { impl TimerFuture { /// Create a new `TimerFuture` which will complete after the provided /// timeout. pub fn new(duration: Duration) -> Self { let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, })); // Spawn the new thread let thread_shared_state = shared_state.clone(); thread::spawn(move || { thread::sleep(duration); let mut shared_state = thread_shared_state.lock().unwrap(); // Signal that the timer has completed and wake up the last // task on which the future was polled, if one exists. shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { waker.wake() } }); TimerFuture { shared_state } } } }
Это всё, что нам нужно, чтобы построить простую футуру таймером. Теперь нам нужен исполнитель, чтобы запустить её на исполнение.