Исполнители и системный ввод/вывод

В главе "Типаж Future", мы обсуждали футуру, которая выполняет асинхронное чтение сокета:


#![allow(unused_variables)]
fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
}

Эта футура читает из сокета доступные данные и если таковых нет, то она передаётся исполнителю с запросом активирования задачи, когда сокет снова станет читаемым. Однако, из текущего примера не ясна реализация типа Socket и, в частности, не совсем очевидно как работает функция set_readable_callback. Как мы можем сделать так, чтобы lw.wake() был вызван, когда сокет станет читаемым? Один из вариантов - иметь поток, который постоянно проверяет стал ли socket читаемым, вызывая при необходимости метод wake(). Тем не менее, такой подход будет весьма не эффективным, так как он требует отдельного потока для каждой блокирующей IO футуры. Это значительно снизит эффективность нашего асинхронного кода.

На практике эта проблема решается при помощи интеграции с IO-зависимыми системными блокирующими примитивами такими, как epoll в Linux, kqueue во FreeBSD и Mac OS, IOCP в Windows и port в Fuchsia (все они предоставляются при помощи кроссплатформенного Rust-пакета mio). Все эти примитивы позволяют потоку заблокироваться с несколькими асинхронными IO-событиями, возвращая одно из завершённых событий. На практике эти API выглядят примерно так:


#![allow(unused_variables)]
fn main() {
struct IoBlocker {
    ...
}

struct Event {
    // ID уникально идентифицирующий событие, которое уже произошло и на которое мы подписались
    id: usize,

    // Набор сигналов, которые мы ожидаем или которые произошли
    signals: Signals,
}

impl IoBlocker {
    /// Создаём новую коллекцию асинхронных IO-событий для блокировки
    fn new() -> Self { ... }

    /// Подпишемся на определённое IO-событие.
    fn add_io_event_interest(
        &self,

        /// Объект, на котором происходит событие
        io_object: &IoObject,

        /// Набор сигналов, которые могут применяться к `io_object`,
        /// для которого должно быть инициировано событие, в паре с
        /// ID, которые передадутся событиям, получившимся в результате нашей подписки.
        event: Event,
    ) { ... }

    /// Заблокируется до появления одного из событий
    fn block(&self) -> Event { ... }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// выведет что-то похожее на "Socket 1 is now READABLE", если сокет станет доступным для чтения.
println!("Socket {:?} is now {:?}", event.id, event.signals);
}

Исполнители футур могут использовать эти примитивы для предоставления асинхронных объектов ввода-вывода, таких как сокеты, которые могут настроить обратные вызовы для запуска при определённом IO-событии. В случае нашего примера c SocketRead, функция Socket::set_readable_callback может выглядеть следующим псевдокодом:


#![allow(unused_variables)]
fn main() {
impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` является ссылкой на локальный исполнитель.
        // Это может быть предусмотрено при создании сокета, 
        // большинство реализаций исполнителей делают это через локальный поток, так удобнее.
        let local_executor = self.local_executor;

        // Уникальный ID для объекта ввода вывода.
        let id = self.id;

        // Сохраним `waker` в данных исполнителя,
        // чтобы его можно было вызвать после того, как будет получено событие.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}
}

Теперь у нас может быть только один поток исполнителя, который может принимать и отправлять любые события ввода-вывода в нужный Waker, который разбудит соответствующую задачу, позволяющая исполнителю довести больше задач до завершения перед возвратом к проверке новых событий ввода-вывода (и цикл продолжается...).