Конкурентная обработка подключений

Проблема с нашим кодом на данный момент заключается в том, что listener.incoming() является блокирующим итератором. Исполнитель не может запускать другие футуры, пока listener ожидает входящих соединений, и мы не можем обрабатывать новое соединение, пока не закончим с предыдущим.

Чтобы исправить это, мы преобразуем listener.incoming() из блокирующего Iterator в неблокирующий Stream. Потоки похожи на итераторы, но могут использоваться асинхронно. Для получения дополнительной информации см. главу о потоках .

Давайте заменим наш блокирующий std::net::TcpListener на неблокирующий async_std::net::TcpListener и обновим наш обработчик соединения, чтобы он принимал async_std::net::TcpStream:

use async_std::prelude::*;

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    //<-- snip -->
    stream.write(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

Асинхронная версия TcpListener реализует трейт Stream для listener.incoming(), это изменение даёт два преимущества. Во-первых, listener.incoming() больше не блокирует исполнителя. Теперь исполнитель может перейти к другим ожидающим футурам, пока нет входящих TCP-соединений для обработки.

Второе преимущество заключается в том, что элементы из Stream могут обрабатываться конкурентно с помощью метода for_each_concurrent. Здесь мы воспользуемся этим методом для конкурентной обработки каждого входящего запроса. Нам нужно импортировать трейт Stream из крейта futures, поэтому наш Cargo.toml теперь выглядит так:

+[dependencies]
+futures = "0.3"

 [dependencies.async-std]
 version = "1.6"
 features = ["attributes"]

Теперь мы можем обрабатывать каждое соединение конкурентно, передавая handle_connection через замыкание. Замыкание становится владельцем каждого TcpStream и запускается, как только становится доступным новый TcpStream. Поскольку handle_connection не блокирующий, медленный запрос больше не будет препятствовать выполнению других запросов.

use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    listener
        .incoming()
        .for_each_concurrent(/* limit */ None, |tcpstream| async move {
            let tcpstream = tcpstream.unwrap();
            handle_connection(tcpstream).await;
        })
        .await;
}

Параллельное обслуживание запросов

В нашем примере до сих пор конкурентность (с использованием асинхронного кода) в основном представлялась как альтернатива параллелизму (с использованием потоков). Однако асинхронный код и потоки не исключают друг друга. В нашем примере for_each_concurrent обрабатывает каждое соединение, но в одном потоке. async-std также позволяет нам запускать задачи в отдельных потоках. Поскольку handle_connection является Send и неблокирующий, его безопасно использовать с async_std::task::spawn. Вот как это будет выглядеть:

use async_std::task::spawn;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    listener
        .incoming()
        .for_each_concurrent(/* limit */ None, |stream| async move {
            let stream = stream.unwrap();
            spawn(handle_connection(stream));
        })
        .await;
}

Теперь мы используем конкурентность и параллелизм для одновременной обработки нескольких запросов! Дополнительную информацию см. в разделе о многопоточных исполнителях.