Применение: создание исполнителя
Футуры Rust'a ленивы: они ничего не будут делать, если не будут активно доводиться до завершения. Один из способов довести их до завершения - это .await
внутри async
функции, но это просто подталкивает проблему на один уровень вверх: кто будет запускать футуры, возвращённые из async
функций верхнего уровня? Ответ в том, что нам нужен исполнитель для Future
.
Исполнители берут набор футур верхнего уровня и запускают их через вызов метода poll
, до тех пока они не завершатся. Как правило, исполнитель будет вызывать метод poll
у футуры один раз, чтобы запустить. Когда футуры сообщают, что готовы продолжить вычисления при вызове метода wake()
, они помещаются обратно в очередь и вызов poll
повторяется до тех пор, пока Future
не будут завершены.
В этом разделе мы напишем нашего собственного простого исполнителя, способного одновременно запускать большое количество футур верхнего уровня.
В этом примере мы зависим от futures
ящика для трейта ArcWake
, который обеспечивает простой способ создания Waker
. Отредактируйте Cargo.toml
, чтобы добавить новую зависимость:
[package]
name = "timer_future"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2021"
[dependencies]
futures = "0.3"
Дальше, мы должны в верхней части src/main.rs
импортировать следующее:
use futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
};
use std::{
future::Future,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::Context,
time::Duration,
};
// The timer we wrote in the previous section:
use timer_future::TimerFuture;
Наш исполнитель будет работать, посылая задачи для запуска по каналу. Исполнитель извлечёт события из канала и запустит их. Когда задача будет готова выполнить больше работы (будет пробуждена), она может запланировать повторный опрос самой себя, отправив себя обратно в канал.
В этом проекте самому исполнителю просто необходим получатель для канала задачи. Пользователь получит экземпляр отправителя, чтобы он мог создавать новые футуры. Сами задачи - это просто футуры, которые могут перезапланировать самих себя, поэтому мы сохраним их как сочетание футуры и отправителя, который задача может использовать, чтобы добавить себя в очередь.
/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
/// In-progress future that should be pushed to completion.
///
/// The `Mutex` is not necessary for correctness, since we only have
/// one thread executing tasks at once. However, Rust isn't smart
/// enough to know that `future` is only mutated from one thread,
/// so we need to use the `Mutex` to prove thread-safety. A production
/// executor would not need this, and could use `UnsafeCell` instead.
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// Handle to place the task itself back onto the task queue.
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// Maximum number of tasks to allow queueing in the channel at once.
// This is just to make `sync_channel` happy, and wouldn't be present in
// a real executor.
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
Давайте ещё добавим метод в spawner, чтобы было проще создавать новые футуры. Этот метод возьмёт тип футуры, поместит его в коробку и создаст новую задачу Arc<Task>
с ним внутри. И эта задача может быть помещена в очередь для исполнителя.
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.try_send(task).expect("too many tasks queued");
}
}
Чтобы опросить futures
, нам нужно создать Waker
. Как описано в разделе задачи пробуждения, Waker
s отвечают за планирование задач, которые будут опрошены снова после вызова wake
. Waker
s сообщают исполнителю, какая именно задача завершилась, позволяя опрашивать как раз те futures
, которые готовы к продолжению выполнения. Простой способ создать новый Waker
, необходимо реализовать типаж ArcWake
, а затем использовать waker_ref
или .into_waker()
функции для преобразования Arc<impl ArcWake>
в Waker
. Давайте реализуем ArcWake
для наших задач, чтобы они были превращены в Waker
s и могли пробуждаться:
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// Implement `wake` by sending this task back onto the task channel
// so that it will be polled again by the executor.
let cloned = arc_self.clone();
arc_self
.task_sender
.try_send(cloned)
.expect("too many tasks queued");
}
}
Когда Waker
создаётся на основе Arc<Task>
, вызывая wake()
, это вызовет отправку копии Arc
в канал задач. Тогда нашему исполнителю нужно подобрать задание и опросить его. Давайте реализуем это:
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// Take the future, and if it has not yet completed (is still Some),
// poll it in an attempt to complete it.
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// Create a `LocalWaker` from the task itself
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&waker);
// `BoxFuture<T>` is a type alias for
// `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
// We can get a `Pin<&mut dyn Future + Send + 'static>`
// from it by calling the `Pin::as_mut` method.
if future.as_mut().poll(context).is_pending() {
// We're not done processing the future, so put it
// back in its task to be run again in the future.
*future_slot = Some(future);
}
}
}
}
}
Поздравляю! Теперь у нас есть работающий исполнитель futures
. Мы даже можем использовать его для запуска async/.await
кода и пользовательских futures
, таких как TimerFuture
которую мы описали ранее:
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// Spawn a task to print before and after waiting on a timer.
spawner.spawn(async {
println!("howdy!");
// Wait for our timer future to complete after two seconds.
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
// Drop the spawner so that our executor knows it is finished and won't
// receive more incoming tasks to run.
drop(spawner);
// Run the executor until the task queue is empty.
// This will print "howdy!", pause, and then print "done!".
executor.run();
}