Превращение однопоточного сервера в многопоточный сервер
В текущей реализации сервер обрабатывает каждый запрос по очереди, то есть, он не начнёт обрабатывать второе соединение, пока не завершит обработку первого. При росте числа запросов к серверу, такое последовательное выполнение было бы все менее и менее оптимальным. Если сервер получает какой-то запрос, обработка которого занимает достаточно много времени, последующим запросам придётся ждать завершения обработки длительного запроса, даже если эти новые запросы сами по себе могут быть обработаны быстро. Нам нужно это исправить, но сначала рассмотрим проблему в действии.
Имитация медленного запроса в текущей реализации сервера
Мы посмотрим, как запрос с медленной обработкой может повлиять на другие запросы, сделанные к серверу в текущей реализации. В листинге 20-10 реализована обработка запроса к ресурсу /sleep с эмуляцией медленного ответа, при которой сервер будет ждать 5 секунд перед тем, как ответить.
Файл: src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { // --snip-- let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
Мы переключились с if
на match
, так как теперь у нас есть три случая. Нам придётся явно сопоставить срез от request_line
для проверки совпадения шаблона со строковыми литералами; match
не делает автоматические ссылки и разыменования, как это делает метод равенства.
Первая ветка совпадает с блоком if
из листинга 20-9. Вторая ветка соответствует запросу /sleep . Когда этот запрос получен, сервер заснёт на 5 секунд, прежде чем отдать успешную HTML-страницу. Третья ветка совпадает с блоком else
из листинга 20-9.
Можно увидеть, насколько примитивен наш сервер: в реальных библиотеках распознавание разных запросов осуществлялось бы гораздо менее многословно!
Запустите сервер командой cargo run
. Затем откройте два окна браузера: одно с адресом http://127.0.0.1:7878/, другое с http://127.0.0.1:7878/sleep. Если вы несколько раз обратитесь к URI /, то как и раньше увидите, что сервер быстро ответит. Но если вы введёте URI /sleep, а затем загрузите URI /, то увидите что / ждёт, пока /sleep
не отработает полные 5 секунд перед загрузкой страницы.
Есть несколько способов, которые можно использовать, чтобы избавиться от подтормаживания запросов после одного медленного запроса; способ, который мы реализуем, называется пулом потоков.
Улучшение пропускной способности с помощью пула потоков
Пул потоков является группой заранее порождённых потоков, ожидающих в пуле и готовых выполнить задачу. Когда программа получает новую задачу, она назначает эту задачу одному из потоков в пуле, и тогда задача будет обработана этим потоком. Остальные потоки в пуле доступны для обработки любых других задач, поступающих в то время, пока первый поток занят. Когда первый поток завершает обработку своей задачи, он возвращается в пул свободных потоков, готовых приступить к новой задаче. Пул потоков позволяет обрабатывать соединения параллельно, увеличивая пропускную способность вашего сервера.
Мы ограничим число потоков в пуле небольшим числом, чтобы защитить нас от атак типа «отказ в обслуживании» (DoS - Denial of Service); если бы наша программа создавала новый поток в момент поступления каждого запроса, то кто-то сделавший 10 миллионов запросов к серверу, мог бы создать хаос, использовать все ресурсы нашего сервера и остановить обработку запросов.
Вместо порождения неограниченного количества потоков, у нас будет фиксированное количество потоков, ожидающих в пуле. Поступающие запросы будут отправляться в пул для обработки. Пул будет иметь очередь входящих запросов. Каждый из потоков в пуле будет извлекать запрос из этой очереди, обрабатывать запрос и затем запрашивать в очереди следующий запрос. При таком дизайне мы можем обрабатывать N
запросов одновременно, где N
- количество потоков. Если каждый поток отвечает на длительный запрос, последующие запросы могут по-прежнему задержаться в очереди, но теперь мы увеличили количество "длинных" запросов, которые мы можем обработать, перед тем, как эта ситуация снова возникнет.
Этот подход - лишь один из многих способов улучшить пропускную способность веб-сервера. Другими вариантами, на которые возможно стоило бы обратить внимание, являются: модель fork/join, модель однопоточного асинхронного ввода-вывода или модель многопоточного асинхронного ввода-вывода. Если вам интересна эта тема, вы можете почитать больше информации о других решениях и попробовать реализовать их самостоятельно. С таким низкоуровневым языком как Rust, любой из этих вариантов осуществим.
Прежде чем приступить к реализации пула потоков, давайте поговорим о том, как должно выглядеть использование пула. Когда вы пытаетесь проектировать код, сначала необходимо написать клиентский интерфейс. Напишите API кода, чтобы он был структурирован так, как вы хотите его вызывать, затем реализуйте функциональность данной структуры, вместо подхода реализовывать функционал, а затем разрабатывать общедоступный API.
Подобно тому, как мы использовали разработку через тестирование (test-driven) в проекте главы 12, мы будем использовать здесь разработку, управляемую компилятором (compiler-driven). Мы напишем код, вызывающий нужные нам функции, а затем посмотрим на ошибки компилятора, чтобы определить, что мы должны изменить дальше, чтобы заставить код работать. Однако перед этим, в качестве отправной точки, мы рассмотрим технику, которую мы не будем применять в дальнейшем.
Порождение потока для каждого запроса
Сначала давайте рассмотрим, как мог бы выглядеть код, если бы он создавал бы новый поток для каждого соединения. Как упоминалось ранее, мы не планируем использовать этот способ в окончательной реализации, из-за возможных проблем при потенциально неограниченном числе порождённых потоков. Это лишь отправная точка, с которой начнёт работу наш многопоточный сервер. Затем мы улучшим код, добавив пул потоков, и тогда разницу между этими двумя решениями будет легче заметить. В листинге 20-11 показаны изменения, которые нужно внести в код main
, чтобы порождать новый поток для обработки каждого входящего соединения внутри цикла for
.
Файл: src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
Как вы изучили в главе 16, функция thread::spawn
создаст новый поток и затем запустит код замыкания в этом новом потоке. Если вы запустите этот код и загрузите /sleep в своём браузере, а затем загрузите / в двух других вкладках браузера, вы действительно увидите, что запросам к / не приходится ждать завершения /sleep. Но, как мы уже упоминали, это в какой-то момент приведёт к сильному снижению производительности системы, так как вы будете создавать новые потоки без каких-либо ограничений.
Создание конечного числа потоков
Мы хотим, чтобы наш пул потоков работал аналогичным, знакомым образом, чтобы переключение с потоков на пул потоков не требовало больших изменений в коде использующем наш API. В листинге 20-12 показан гипотетический интерфейс для структуры ThreadPool
, который мы хотим использовать вместо thread::spawn
.
Файл: src/main.rs
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Мы используем ThreadPool::new
, чтобы создать новый пул потоков с конфигурируемым числом потоков, в данном случае четырьмя. Затем в цикле for
функция pool.execute
имеет интерфейс, похожий на thread::spawn
, в том смысле, что он так же принимает замыкание, код которого пул должен выполнить для каждого соединения. Нам нужно реализовать pool.execute
, чтобы он принимал замыкание и передавал его потоку из пула для выполнения. Этот код пока не скомпилируется, но мы постараемся, чтобы компилятор помог нам это исправить.
Создание ThreadPool
с помощью разработки, управляемой компилятором
Внесите изменения листинга 20-12 в файл src/main.rs, а затем давайте воспользуемся ошибками компилятора из команды cargo check
для управления нашей разработкой. Вот первая ошибка, которую мы получаем:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Замечательно! Ошибка говорит о том, что нам нужен тип или модуль ThreadPool
, поэтому мы сейчас его создадим. Наша реализация ThreadPool
не будет зависеть от того, что делает наш веб-сервер. Итак, давайте переделаем крейт hello
из бинарного в библиотечный, чтобы хранить там нашу реализацию ThreadPool
. После того, как мы переключимся в библиотечный крейт, мы также сможем использовать отдельную библиотеку пула потоков для любой подходящей работы, а не только для обслуживания веб-запросов.
Создайте файл src/lib.rs, который содержит следующий код, который является простейшим определением структуры ThreadPool
, которое мы можем иметь на данный момент:
Файл: src/lib.rs
pub struct ThreadPool;
Затем отредактируйте файл main.rs, чтобы внести ThreadPool
из библиотечного крейта в текущую область видимости, добавив следующий код в начало src/main.rs:
Файл: src/main.rs
use hello::ThreadPool;
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Этот код по-прежнему не будет работать, но давайте проверим его ещё раз, чтобы получить следующую ошибку, которую нам нужно устранить:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Эта ошибка указывает, что далее нам нужно создать ассоциированную функцию с именем new
для ThreadPool
. Мы также знаем, что new
должна иметь один параметр, который может принимать 4
в качестве аргумента и должен возвращать экземпляр ThreadPool
. Давайте реализуем простейшую функцию new
, которая будет иметь эти характеристики:
Файл: src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
Мы выбираем usize
в качестве типа параметра size
, потому что мы знаем, что отрицательное число потоков не имеет никакого смысла. Мы также знаем, что мы будем использовать число 4 в качестве количества элементов в коллекции потоков, для чего предназначен тип usize
, как обсуждалось в разделе "Целочисленные типы" главы 3.
Давайте проверим код ещё раз:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Теперь мы ошибка возникает из-за того, что у нас нет метода execute
в структуре ThreadPool
. Вспомните раздел "Создание конечного числа потоков", в котором мы решили, что наш пул потоков должен иметь интерфейс, похожий на thread::spawn
. Кроме того, мы реализуем функцию execute
, чтобы она принимала замыкание и передавала его свободному потоку из пула для запуска.
Мы определим метод execute
у ThreadPool
, принимающий замыкание в качестве параметра. Вспомните из раздела "Перемещение захваченных значений из замыканий и трейты Fn
" главы 13 информацию о том, что мы можем принимать замыкания в качестве параметров тремя различными типажами: Fn
, FnMut
и FnOnce
. Нам нужно решить, какой тип замыкания использовать здесь. Мы знаем, что в конечном счёте мы сделаем что-то похожее на реализацию стандартной библиотеки thread::spawn
, поэтому мы можем посмотреть, какие ограничения накладывает на свой параметр сигнатура функции thread::spawn
. Документация показывает следующее:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
Параметр типа F
- это как раз то, что нас интересует; параметр типа T
относится к возвращаемому значению и нам он не интересен. Можно увидеть, что spawn
использует FnOnce
в качестве ограничения типажа у F
. Возможно это как раз то, чего мы хотим, так как в конечном итоге мы передадим полученный в execute
аргумент в функцию spawn
. Дополнительную уверенность в том, что FnOnce
- это именно тот типаж, который мы хотим использовать, нам даёт факт, что поток для выполнения запроса будет выполнять замыкание этого запроса только один раз, что соответствует части Once
("единожды") в названии типажа FnOnce
.
Параметр типа F
также имеет ограничение типажа Send
и ограничение времени жизни 'static
, которые полезны в нашей ситуации: нам нужен Send
для передачи замыкания из одного потока в другой и 'static
, потому что мы не знаем, сколько времени поток будет выполняться. Давайте создадим метод execute
для ThreadPool
, который будет принимать обобщённый параметр типа F
со следующими ограничениями:
Файл: src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
Мы по-прежнему используем ()
после FnOnce
потому что типаж FnOnce
представляет замыкание, которое не принимает параметров и возвращает единичный тип ()
. Также как и при определении функций, тип возвращаемого значения в сигнатуре может быть опущен, но даже если у нас нет параметров, нам все равно нужны скобки.
Опять же, это самая простая реализация метода execute
: она ничего не делает, мы просто пытаемся сделать код компилируемым. Давайте проверим снова:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
Сейчас мы получаем только предупреждения, что означает, что код компилируется! Но обратите внимание, если вы попробуете cargo run
и сделаете запрос в браузере, вы увидите ошибки в браузере, которые мы видели в начале главы. Наша библиотека на самом деле ещё не вызывает замыкание, переданное в execute
!
Примечание: вы возможно слышали высказывание о языках со строгими компиляторами, таких как Haskell и Rust, которое звучит так: «Если код компилируется, то он работает». Но это высказывание не всегда верно. Наш проект компилируется, но абсолютно ничего не делает! Если бы мы создавали реальный, законченный проект, это был бы хороший момент начать писать модульные тесты, чтобы проверять, что код компилируется и имеет желаемое поведение.
Проверка количества потоков в new
Мы ничего не делаем с параметрами new
и execute
. Давайте реализуем тела этих функций с нужным нам поведением. Для начала давайте подумаем о new
. Ранее мы выбрали беззнаковый тип для параметра size
, потому что пул с отрицательным числом потоков не имеет смысла. Пул с нулём потоков также не имеет смысла, однако ноль - это вполне допустимое значение usize
. Мы добавим код для проверки того, что size
больше нуля, прежде чем вернуть экземпляр ThreadPool
, и заставим программу паниковать, если она получит ноль, используя макрос assert!
, как показано в листинге 20-13.
Файл: src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
Мы добавили немного документации для нашей структуры ThreadPool
с помощью комментариев. Обратите внимание, что мы следовали хорошим практикам документирования, добавив раздел, в котором указывается ситуация, при которой функция может аварийно завершаться, как это обсуждалось в главе 14. Попробуйте запустить cargo doc --open
и кликнуть на структуру ThreadPool
, чтобы увидеть как выглядит сгенерированная документация для new
!
Вместо добавления макроса assert!
, как мы здесь сделали, мы могли бы преобразовать функцию new
в функцию build
таким образом, чтобы она возвращала Result
, аналогично тому, как мы делали в функции Config::new
проекта ввода/вывода в листинге 12-9. Но в данном случае мы решили, что попытка создания пула потоков без указания хотя бы одного потока должна быть непоправимой ошибкой. Если вы чувствуете такое стремление, попробуйте написать функцию build
с сигнатурой ниже, для сравнения с функцией new
:
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
Создание места для хранения потоков
Теперь, имея возможность удостовериться, что количество потоков для хранения в пуле соответствует требованиям, мы можем создавать эти потоки и сохранять их в структуре ThreadPool
перед тем как возвратить её. Но как мы "сохраним" поток? Давайте ещё раз посмотрим на сигнатуру thread::spawn
:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
Функция spawn
возвращает тип JoinHandle<T>
, где T
является типом, который возвращает замыкание. Давайте попробуем использовать JoinHandle
и посмотрим, что произойдёт. В нашем случае замыкания, которые мы передаём пулу потоков, будут обрабатывать соединение и не будут возвращать ничего, поэтому T
будет единичным (unit) типом ()
.
Код в листинге 20-14 скомпилируется, но пока не создаст ни одного потока. Мы изменили определение ThreadPool
так, чтобы он содержал вектор экземпляров thread::JoinHandle<()>
, инициализировали вектор ёмкостью size
, установили цикл for
, который будет выполнять некоторый код для создания потоков, и вернули экземпляр ThreadPool
, содержащий их.
Файл: src/lib.rs
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
Мы включили std::thread
в область видимости библиотечного крейта, потому что мы используем thread::JoinHandle
в качестве типа элементов вектора в ThreadPool
.
После получения корректного значения size, наш ThreadPool
создаёт новый вектор, который может содержать size
элементов. Функция with_capacity
выполняет ту же задачу, что и Vec::new
, но с важным отличием: она заранее выделяет необходимый объём памяти в векторе. Поскольку мы знаем, что нам нужно хранить size
элементов в векторе, предварительное выделение памяти для этих элементов будет немного более эффективным, чем использование Vec::new
, при котором размер вектора будет увеличиваться по мере вставки элементов.
Если вы снова запустите команду cargo check
, она должна завершиться успешно.
Структура Worker
, ответственная за отправку кода из ThreadPool
в поток
Мы специально оставили комментарий в цикле for
в Листинге 20-14 по поводу создания потоков. Сейчас мы разберёмся, как на самом деле создаются потоки. Стандартная библиотека предоставляет thread::spawn
для создания потоков, причём thread::spawn
ожидает получить некоторый код, который поток должен выполнить, как только он будет создан. Однако в нашем случае мы хотим создавать потоки и заставлять их ожидать код, который мы будем передавать им позже. Реализация потоков в стандартной библиотеке не предоставляет никакого способа сделать это, мы должны реализовать это вручную.
Мы будем реализовывать это поведение, добавив новую структуру данных между ThreadPool
и потоками, которая будет управлять этим новым поведением. Мы назовём эту структуру Worker
("работник"), это общепринятое имя в реализации пулов. Работник берёт код, который нужно выполнить, и запускает этот код внутри рабочего потока. Представьте людей, работающих на кухне ресторана: работники ожидают, пока не поступят заказы от клиентов, а затем они несут ответственность за принятие этих заказов и их выполнение.
Вместо того чтобы хранить вектор экземпляров JoinHandle<()>
в пуле потоков, мы будем хранить экземпляры структуры Worker
. Каждый Worker
будет хранить один экземпляр JoinHandle<()>
. Затем мы реализуем метод у Worker
, который будет принимать замыкание и отправлять его в существующий поток для выполнения. Для того чтобы мы могли различать работники в пуле при логировании или отладке, мы также присвоим каждому работнику id
.
Вот как выглядит новая последовательность действий, которые будут происходить при создании ThreadPool
. Мы реализуем код, который будет отправлять замыкание в поток, после того, как у нас будет Worker
, заданный следующим образом:
- Определим структуру
Worker
, которая содержитid
иJoinHandle<()>
. - Изменим
ThreadPool
, чтобы он содержал вектор экземпляровWorker
. - Определим функцию
Worker::new
, которая принимает номерid
и возвращает экземплярWorker
, который содержитid
и поток, порождённый с пустым замыканием. - В
ThreadPool::new
используем счётчик циклаfor
для генерацииid
, создаём новыйWorker
с этимid
и сохраняем экземпляр "работника" в вектор.
Если вы готовы принять вызов, попробуйте реализовать эти изменения самостоятельно, не глядя на код в листинге 20-15.
Готовы? Вот листинг 20-15 с одним из способов сделать указанные ранее изменения.
Файл: src/lib.rs
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
Мы изменили название поля в ThreadPool
с threads
на workers
, поскольку теперь оно содержит экземпляры Worker
вместо экземпляров JoinHandle<()>
. Мы используем счётчик в цикле for
для передачи цифрового идентификатора в качестве аргумента Worker::new
, и сохраняем каждый новый Worker
в векторе с именем workers
.
Внешний код (вроде нашего сервера в src/bin/main.rs) не обязательно должен знать подробности реализации, касающиеся использования структуры Worker
внутри ThreadPool
, поэтому мы делаем структуру Worker
и её функцию new
приватными. Функция Worker::new
использует заданный нами id
и сохраняет экземпляр JoinHandle<()>
, который создаётся при порождении нового потока с пустым замыканием.
Примечание: Если операционная система не может создать поток из-за нехватки системных ресурсов,
thread::spawn
аварийно завершится. Это приведёт к аварийному завершению нашего сервера целиком, даже если некоторые потоки были созданы успешно. Для простоты будем считать, что нас устраивает такое поведение, но в реальной реализации пула потоков вы, вероятно, захотите использоватьstd::thread::Builder
и его методspawn
, который вместо этого возвращаетResult
.
Этот код скомпилируется и будет хранить количество экземпляров Worker
, которое мы указали в качестве аргумента функции ThreadPool::new
. Но мы всё ещё не обрабатываем замыкание, которое мы получаем в методе execute
. Давайте посмотрим, как это сделать далее.
Отправка запросов в потоки через каналы
Следующая проблема, с которой мы будем бороться, заключается в том, что замыкания, переданные в thread::spawn
абсолютно ничего не делают. Сейчас мы получаем замыкание, которое хотим выполнить, в методе execute
. Но мы должны передать какое-то замыкание в метод thread::spawn
, при создании каждого Worker
во время создания ThreadPool
.
Мы хотим, чтобы вновь созданные структуры Worker
извлекали код для запуска из очереди, хранящейся в ThreadPool
и отправляли этот код в свой поток для выполнения.
Каналы (channels), простой способ коммуникации между двумя потоками, с которыми мы познакомились в главе 16, кажется идеально подойдут для этого сценария. Мы будем использовать канал в качестве очереди заданий, а команда execute
отправит задание из ThreadPool
экземплярам Worker
, которые будут отправлять задание в свой поток. План таков:
ThreadPool
создаст канал и будет хранить отправитель.- Каждый
Worker
будет хранить приёмник. - Мы создадим новую структуру
Job
, которая будет хранить замыкания, которые мы хотим отправить в канал. - Метод
execute
отправит задание, которое он хочет выполнить, в отправляющую сторону канала. - В своём потоке
Worker
будет циклически опрашивать принимающую сторону канала и выполнять замыкание любого задания, которое он получит.
Давайте начнём с создания канала в ThreadPool::new
и удержания отправляющей стороны в экземпляре ThreadPool
, как показано в листинге 20-16. В структуре Job
сейчас ничего не содержится, но это будет тип элемента, который мы отправляем в канал.
Файл: src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
В ThreadPool::new
мы создаём наш новый канал и сохраняем в пуле его отправляющую сторону. Код успешно скомпилируется.
Давайте попробуем передавать принимающую сторону канала каждому "работнику" (структуре Worker), когда пул потоков создаёт канал. Мы знаем, что хотим использовать получающую часть канала в потоке, порождаемым "работником", поэтому мы будем ссылаться на параметр receiver
в замыкании. Код 20-17 пока не компилируется.
Файл: src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
Мы внесли несколько небольших и простых изменений: мы передаём принимающую часть канала в Worker::new
, а затем используем его внутри замыкания.
При попытке проверить код, мы получаем ошибку:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
Код пытается передать receiver
нескольким экземплярам Worker
. Это не сработает, поскольку, как вы можете помнить из главы 16: реализация канала, которую предоставляет Rust - несколько производителей, один потребитель. Это означает, что мы не можем просто клонировать принимающую сторону канала, чтобы исправить этот код. Кроме этого, мы не хотим отправлять одно и то же сообщение нескольким потребителям, поэтому нам нужен единый список сообщений для множества обработчиков, чтобы каждое сообщение обрабатывалось лишь один раз.
Кроме того, удаление задачи из очереди канала включает изменение receiver
, поэтому потокам необходим безопасный способ делиться и изменять receiver
, в противном случае мы можем получить условия гонки (как описано в главе 16).
Вспомните умные указатели, которые обсуждались в главе 16: чтобы делиться владением между несколькими потоками и разрешать потокам изменять значение, нам нужно использовать тип Arc<Mutex<T>>
. Тип Arc
позволит нескольким "работникам" владеть получателем (receiver), а Mutex
гарантирует что только один "работник" сможет получить задание (job) от получателя за раз. Листинг 20-18 показывает изменения, которые мы должны сделать.
Файл: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
В ThreadPool::new
мы помещаем принимающую сторону канала внутрь Arc
и Mutex
. Для каждого нового "работника" мы клонируем Arc
, чтобы увеличить счётчик ссылок так, что "работники" могут разделять владение принимающей стороной канала.
С этими изменениями код компилируется! Мы подбираемся к цели!
Реализация метода execute
Давайте реализуем наконец метод execute
у структуры ThreadPool
. Мы также изменим тип Job
со структуры на псевдоним типа для типаж-объекта, который будет содержать тип замыкания, принимаемый методом execute
. Как описано в разделе "Создание синонимов типа с помощью псевдонимов типа" главы 19, псевдонимы типов позволяют делать длинные типы короче, облегчая их использование. Посмотрите на листинг 20-19.
Файл: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
После создания нового экземпляра Job
с замыканием, полученным в execute
, мы посылаем его через отправляющий конец канала. На тот случай, если отправка не удастся, вызываем unwrap
у send
. Это может произойти, например, если мы остановим выполнение всех наших потоков, что означает, что принимающая сторона прекратила получать новые сообщения. На данный момент мы не можем остановить выполнение наших потоков: наши потоки будут функционировать до тех пор, пока существует пул. Причина, по которой мы используем unwrap
, заключается в том, что, хотя мы знаем, что сбой не произойдёт, компилятор этого не знает.
Но мы ещё не закончили! В "работнике" (worker) наше замыкание, переданное в thread::spawn
все ещё ссылается только на принимающую сторону канала. Вместо этого нам нужно, чтобы замыкание работало в бесконечном цикле, запрашивая задание у принимающей части канала и выполняя задание, когда оно принято. Давайте внесём изменения, показанные в листинге 20-20 внутри Worker::new
.
Файл: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
Здесь мы сначала вызываем lock
у receiver
, чтобы получить мьютекс, а затем вызываем unwrap
, чтобы аварийно завершить работу при любых ошибках. Захват блокировки может завершиться неудачей, если мьютекс находится в отравленном состоянии (poisoned state), что может произойти, если какой-то другой поток завершился аварийно, удерживая блокировку, вместо снятия блокировки. В этой ситуации вызвать unwrap
для аварийного завершения потока вполне оправдано. Не стесняйтесь заменить unwrap
на expect
с сообщением об ошибке, которое имеет для вас значение.
Если мы получили блокировку мьютекса, мы вызываем recv
, чтобы получить Job
из канала. Последний вызов unwrap
позволяет миновать любые ошибки, которые могут возникнуть, если поток, контролирующий отправитель, прекратил функционировать, подобно тому, как метод send
возвращает Err
, если получатель не принимает сообщение.
Вызов recv
- блокирующий, поэтому пока задач нет, текущий поток будет ждать, пока задача не появится. Mutex<T>
гарантирует, что только один поток Worker
за раз попытается запросить задачу.
Наш пул потоков теперь находится в рабочем состоянии! Выполните cargo run
и сделайте несколько запросов:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: field is never read: `id`
--> src/lib.rs:48:5
|
48 | id: usize,
| ^^^^^^^^^
warning: field is never read: `thread`
--> src/lib.rs:49:5
|
49 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: `hello` (lib) generated 3 warnings
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Успех! Теперь у нас есть пул потоков, который обрабатывает соединения асинхронно. Никогда не создаётся более четырёх потоков, поэтому наша система не будет перегружена, если сервер получит много запросов. Если мы отправим запрос ресурса /sleep, сервер сможет обслуживать другие запросы, обрабатывая их в другом потоке.
Примечание: если вы запросите /sleep в нескольких окнах браузера одновременно, они могут загружаться по одному, с интервалами в 5 секунд. Некоторые веб-браузеры выполняют несколько экземпляров одного и того же запроса последовательно из-за кэширования. Такое ограничение не связано с работой нашего веб-сервера.
После изучения цикла while let
в главе 18 вы можете удивиться, почему мы не написали код рабочего потока (worker thread), как показано в листинге 20-22.
Файл: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
Этот код компилируется и запускается, но не даёт желаемого поведения: медленный запрос всё равно приведёт к тому, что другие запросы будут ждать обработки. Причина здесь несколько тоньше: структура Mutex
не имеет публичного метода unlock
, так как владение блокировкой основано на времени жизни MutexGuard<T>
внутри LockResult<MutexGuard<T>>
, которое возвращает метод lock
. Во время компиляции анализатор заимствований может проследить за выполнением правила, согласно которому к ресурсу, охраняемому Mutex
, нельзя получить доступ пока мы удерживаем блокировку. Однако в этой реализация мы также можем получить ситуацию, когда блокировка будет удерживаться дольше, чем предполагалось, если мы не будем внимательно учитывать время жизни MutexGuard<T>
.
Код в листинге 20-20, использующий let job = receiver.lock().unwrap().recv().unwrap();
работает, потому что при использовании let
любые промежуточные значения, используемые в выражении справа от знака равенства, немедленно уничтожаются после завершения инструкции let
. Однако while let
(и if let
и match
) не удаляет временные значения до конца связанного блока. Таким образом, в листинге 20-21 блокировка не снимается в течение всего времени вызова job()
, что означает, что другие работники не могут получать задания.