Конкурентная обработка подключений
Проблема с нашим кодом на данный момент заключается в том, что listener.incoming()
является блокирующим итератором. Исполнитель не может запускать другие футуры, пока listener
ожидает входящих соединений, и мы не можем обрабатывать новое соединение, пока не закончим с предыдущим.
Чтобы исправить это, мы преобразуем listener.incoming()
из блокирующего Iterator в неблокирующий Stream. Потоки похожи на итераторы, но могут использоваться асинхронно. Для получения дополнительной информации см. главу о потоках .
Давайте заменим наш блокирующий std::net::TcpListener
на неблокирующий async_std::net::TcpListener
и обновим наш обработчик соединения, чтобы он принимал async_std::net::TcpStream
:
use async_std::prelude::*;
async fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
//<-- snip -->
stream.write(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
Асинхронная версия TcpListener
реализует трейт Stream
для listener.incoming()
, это изменение даёт два преимущества. Во-первых, listener.incoming()
больше не блокирует исполнителя. Теперь исполнитель может перейти к другим ожидающим футурам, пока нет входящих TCP-соединений для обработки.
Второе преимущество заключается в том, что элементы из Stream могут обрабатываться конкурентно с помощью метода for_each_concurrent
. Здесь мы воспользуемся этим методом для конкурентной обработки каждого входящего запроса. Нам нужно импортировать трейт Stream
из крейта futures
, поэтому наш Cargo.toml теперь выглядит так:
+[dependencies]
+futures = "0.3"
[dependencies.async-std]
version = "1.6"
features = ["attributes"]
Теперь мы можем обрабатывать каждое соединение конкурентно, передавая handle_connection
через замыкание. Замыкание становится владельцем каждого TcpStream
и запускается, как только становится доступным новый TcpStream
. Поскольку handle_connection
не блокирующий, медленный запрос больше не будет препятствовать выполнению других запросов.
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(/* limit */ None, |tcpstream| async move {
let tcpstream = tcpstream.unwrap();
handle_connection(tcpstream).await;
})
.await;
}
Параллельное обслуживание запросов
В нашем примере до сих пор конкурентность (с использованием асинхронного кода) в основном представлялась как альтернатива параллелизму (с использованием потоков). Однако асинхронный код и потоки не исключают друг друга. В нашем примере for_each_concurrent
обрабатывает каждое соединение, но в одном потоке. async-std
также позволяет нам запускать задачи в отдельных потоках. Поскольку handle_connection
является Send
и неблокирующий, его безопасно использовать с async_std::task::spawn
. Вот как это будет выглядеть:
use async_std::task::spawn; #[async_std::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap(); listener .incoming() .for_each_concurrent(/* limit */ None, |stream| async move { let stream = stream.unwrap(); spawn(handle_connection(stream)); }) .await; }
Теперь мы используем конкурентность и параллелизм для одновременной обработки нескольких запросов! Дополнительную информацию см. в разделе о многопоточных исполнителях.