Превращение однопоточного сервера в многопоточный сервер

Прямо сейчас сервер будет обрабатывать каждый запрос в очереди, что означает, что он не будет обрабатывать второе соединение, пока первое не завершит обработку. Если бы сервер получал все больше и больше запросов, это последовательное выполнение было бы все менее и менее оптимальным. Если сервер получает какой-то запрос, обработка которого занимает слишком много времени, то последующие запросы должны будут ждать завершения обработки длительного запроса, даже если эти новые запросы могут быть обработаны гораздо быстрее. Нам нужно это исправить, но сначала мы рассмотрим проблему в действии.

Имитация медленного запроса в текущей реализации сервера

Мы посмотрим, как запрос с медленной обработкой может повлиять на другие запросы, сделанные к серверу в текущей реализации. В листинге 20-10 реализована обработка запроса к ресурсу /sleep с эмуляцией медленного ответа, который заставит сервер не работать в течение 5 секунд перед ответом.

Файл: src/main.rs


#![allow(unused)]
fn main() {
use std::thread;
use std::time::Duration;
use std::io::prelude::*;
use std::net::TcpStream;
use std::fs::File;
// --snip--

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();
    // --snip--

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    // --snip--
}
}

Листинг 20-10: Имитация медленного запроса путём распознавания обращения к /sleep и засыпанию на 5 секунд

Этот код немного неряшливый, но он достаточно хорошо подходит для целей имитации. Мы создали второй запрос sleep, данные которого распознает сервер. Мы добавили else if после блока if, чтобы проверить запрос к /sleep. Когда этот запрос будет получен, сервер заснёт на 5 секунд, прежде чем отобразить HTML страницу успешного выполнения.

Можно увидеть, насколько примитивен наш сервер: реальные библиотеки будут обрабатывать распознавание нескольких запросов гораздо менее многословно!

Запустите сервер командой cargo run. Затем откройте два окна браузера: одно с адресом http://127.0.0.1:7878/, другое с http://127.0.0.1:7878/sleep. Если вы несколько раз обратитесь к URI /, то как и раньше увидите, что сервер быстро ответит. Но если вы введёте URI /sleep, затем загрузите URI /, то увидите что / ждёт, пока /sleep не отработает полные 5 секунд перед загрузкой страницы.

Есть несколько способов изменить работу нашего веб-сервера, чтобы избежать медленной обработки большого количества запросов из-за одного медленного; способ который мы реализуем является пулом потоков.

Улучшение пропускной способности с помощью пула потоков

Пул потоков является группой заранее порождённых потоков, ожидающих в пуле и готовых выполнить задачу. Когда программа получает новую задачу, она назначает задачу одному из потоков в пуле и этот поток будет обрабатывать задачу. Остальные потоки в пуле доступны для обработки любых других задач, возникающих во время обработки первого потока. Когда первый поток завершает обработку своей задачи, он возвращается в пул свободных потоков, готовых обработать новую задачу. Пул потоков позволяет обрабатывать соединения одновременно, увеличивая пропускную способность вашего сервера.

Мы ограничим число потоков в пуле небольшим числом, чтобы защитить нас от атак типа «отказ в обслуживании» (DoS - Denial of Service); если бы наша программа создавала новый поток в момент поступления каждого запроса, то кто-то сделавший 10 миллионов запросов к серверу, мог бы создать хаос, использовать все ресурсы нашего сервера и остановить обработку запросов.

Вместо порождения неограниченного количества потоков, у нас будет фиксированное количество потоков, ожидающих в пуле. По мере поступления запросов они будут отправляться в пул для обработки. Пул будет поддерживать очередь входящих запросов. Каждый из потоков в пуле будет извлекать запрос из этой очереди, обрабатывать запрос и затем запрашивать в очереди следующий запрос. При таком дизайне мы можем обрабатывать N запросов одновременно, где N - количество потоков. Если каждый поток отвечает на длительный запрос, последующие запросы могут по-прежнему задержаться в очереди, но мы увеличили число долго играющих запросов, которые можно обработать до достижения этой точки.

Этот подход является лишь одним из многих способов улучшить пропускную способность веб-сервера. Другими вариантами, которые вы могли бы изучить являются модель fork/join и однопоточная модель асинхронного ввода-вывода. Если вам интересна эта тема, вы можете прочитать о других решениях больше и попробовать внедрить их в помощью Rust. С языком низкого уровня как Rust, возможны все эти варианты.

Прежде чем приступить к реализации пула потоков, давайте поговорим о том, как должно выглядеть использование пула. Когда вы пытаетесь проектировать код, сначала необходимо написать клиентский интерфейс. Напишите API кода, чтобы он был структурирован так, как вы хотите его вызывать, затем реализуйте функциональность данной структуры, вместо подхода реализовывать функционал, а затем разрабатывать общедоступный API.

Подобно тому, как мы использовали разработку через тестирование (test-driven) в проекте главы 12, мы будем использовать здесь разработку, управляемую компилятором (compiler-driven). Мы напишем код, который вызывает нужные нам функции, а затем посмотрим на ошибки компилятора, чтобы определить, что мы должны изменить дальше, чтобы заставить код работать.

Структура кода, если мы могли бы создавать поток для каждого запроса

Сначала давайте рассмотрим, как мог бы выглядеть код, если он создавал бы новый поток для каждого соединения. Как упоминалось ранее, это не окончательный план, а это отправная точка из-за проблем с возможным порождением неограниченного количества потоков. В листинге 20-11 показаны изменения, которые нужно внести в main, чтобы запускать новый поток для обработки каждого входящего потока соединения в цикле for.

Файл: src/main.rs

use std::thread;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}
fn handle_connection(mut stream: TcpStream) {}

Листинг 20-11: Порождение нового потока для каждого потока соединения

Как вы изучили в главе 16, thread::spawn создаст новый поток и затем запустит код замыкания в этом новом потоке. Если вы запустите этот код и загрузите /sleep в своём браузере, в затем загрузите / в двух других вкладках браузера, вы действительно увидите, что запросы к / не должны ждать завершения /sleep. Но, как мы уже упоминали, это в конечном счёте перегрузит систему, потому что вы будете создавать новые потоки без каких-либо ограничений.

Создание аналогичного интерфейса для конечного числа потоков

Мы хотим, чтобы наш пул потоков работал аналогичным, знакомым образом, чтобы переключение с потоков на пул потоков не требовало больших изменений в коде использующем наш API. В листинге 20-12 показан гипотетический интерфейс для структуры ThreadPool, который мы хотим использовать вместо thread::spawn.

Файл: src/main.rs

use std::thread;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
struct ThreadPool;
impl ThreadPool {
   fn new(size: u32) -> ThreadPool { ThreadPool }
   fn execute<F>(&self, f: F)
       where F: FnOnce() + Send + 'static {}
}

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}
fn handle_connection(mut stream: TcpStream) {}

Листинг 20-12: Наш идеальный интерфейс ThreadPool

Мы используем ThreadPool::new, чтобы создать новый пул потоков с конфигурируемым количеством потоков, в данном случае четыре. Затем в цикле for выполняем pool.execute имеющий интерфейс, аналогичный интерфейсу thread::spawn, в котором выполняется замыкание, которое пул должен выполнить для каждого потока соединения. Нам нужно реализовать pool.execute, чтобы он принимал замыкание и передавал его потоку из пула для выполнения. Этот код не компилируется, но мы постараемся, чтобы компилятор в его исправлении.

Создание структуры ThreadPool использованием разработки, управляемой компилятором

Внесите изменения листинга 20-12 в файл src/main.rs, а затем давайте воспользуемся ошибками компилятора из команды cargo check для управления нашей разработкой. Вот первая ошибка, которую мы получаем:

$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve. Use of undeclared type or module `ThreadPool`
  --> src\main.rs:10:16
   |
10 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^^^^^^ Use of undeclared type or module
   `ThreadPool`

error: aborting due to previous error

Замечательно! Ошибка говорит о том, что нам нужен тип или модуль ThreadPool, поэтому мы создадим его сейчас. Наша реализация ThreadPool будет зависеть от того, какую работу выполняет наш веб-сервер. Итак, давайте переделаем крейт hello из бинарного в библиотечный для хранения реализации ThreadPool. После того, как поменяем в библиотечный крейт, мы также сможем использовать отдельную библиотеку пула потоков для любой работы, которую мы хотим выполнить с его использованием, а не только для обслуживания веб-запросов.

Создайте файл src/lib.rs, который содержит следующее, что является простейшим определением структуры ThreadPool, которую мы можем иметь в данный момент:

Файл: src/lib.rs


#![allow(unused)]
fn main() {
pub struct ThreadPool;
}

Затем создайте новый каталог src/bin и переместите двоичный крейт с корнем в src/main.rs в src/bin/main.rs. Это сделает библиотечный крейт основным крейтом в каталоге hello; мы все ещё можем запустить двоичный файл из src/bin/main.rs, используя cargo run. Переместив файл main.rs, отредактируйте его, чтобы подключить крейт библиотеки и добавить тип ThreadPool в область видимости, добавив следующий код в начало src/bin/main.rs:

Файл: src/bin/main.rs

use hello::ThreadPool;

Этот код по-прежнему не будет работать, но давайте проверим его ещё раз, чтобы получить следующую ошибку, которую нам нужно устранить:

$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for type
`hello::ThreadPool` in the current scope
 --> src/bin/main.rs:13:16
   |
13 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^^^^^^ function or associated item not found in
   `hello::ThreadPool`

Эта ошибка указывает, что далее нам нужно создать ассоциированную функцию с именем new для ThreadPool. Мы также знаем, что new должен иметь один параметр, который может принимать 4 в качестве аргумента и должен возвращать экземпляр ThreadPool. Давайте реализуем простейшую функцию new, которая будет иметь эти характеристики:

Файл: src/lib.rs


#![allow(unused)]
fn main() {
pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}
}

Мы выбираем usize в качестве типа параметра size, потому что мы знаем, что отрицательное число потоков не имеет никакого смысла. Мы также знаем, что мы будем использовать число 4 в качестве количества элементов в коллекции потоков, для чего предназначен тип usize, как обсуждалось в разделе "Целочисленные типы" главы 3.

Давайте проверим код ещё раз:

$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`
 --> src/lib.rs:4:16
  |
4 |     pub fn new(size: usize) -> ThreadPool {
  |                ^^^^
  |
  = note: #[warn(unused_variables)] on by default
  = note: to avoid this warning, consider using `_size` instead

error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope
  --> src/bin/main.rs:18:14
   |
18 |         pool.execute(|| {
   |              ^^^^^^^

Теперь мы получаем предупреждение и ошибку. Игнорируем предупреждение не надолго, ошибка происходит потому что у нас нет метода execute в структуре ThreadPool. Вспомните раздел "Создание подобного интерфейса для конечного числа потоков", в котором мы решили, что наш пул потоков должен иметь интерфейс, похожий на thread::spawn. Кроме того, мы реализуем функцию execute, чтобы она принимала замыкание и передавала его свободному потоку из пула для запуска.

Мы определим метод execute у ThreadPool для приёма замыкания в качестве параметра. Вспомните раздел "Хранение замыканий с использованием общих параметров и типажей Fn" главы 13 и о том, что мы можем принимать замыкания в качестве параметров с тремя различными типажами: Fn , FnMut и FnOnce. Нам нужно решить, какой тип замыкания использовать здесь. Мы знаем, что в конечном счёте мы сделаем что-то похожее на реализацию стандартной библиотеки thread::spawn, поэтому мы можем посмотреть, какие ограничения накладывает на его параметр в сигнатуре thread::spawn. Документация показывает следующее:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static

Параметр типа F - это тот, который нас интересует; параметр типа T относится к возвращаемому значению и нам он не интересен. Можно увидеть, что spawn использует FnOnce в качестве ограничения типажа у F. Это, вероятно то, чего мы хотим, потому что мы в конечном итоге передадим получаемый аргумент в execute для spawn. Мы также можем быть ещё более уверены, что FnOnce - это тот типаж, который мы хотим использовать, поскольку поток для выполнения запроса будет выполнять этот запрос только один раз, что соответствует параметру Once в типаже FnOnce.

Параметр типа F также имеет ограничение типажа Send и ограничение времени жизни 'static, которые полезны в нашей ситуации: нам нужен Send для передачи замыкания из одного потока в другой и 'static, потому что мы не знаем, сколько времени займёт выполнение потока. Давайте создадим метод execute для ThreadPool, который будет принимать обобщённый параметр типа F со следующими ограничениями:

Файл: src/lib.rs


#![allow(unused)]
fn main() {
pub struct ThreadPool;
impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {

    }
}
}

Мы по-прежнему используем () после FnOnce потому что типаж FnOnce представляет замыкание, которое не принимает параметров и возвращает единичный тип (). Также как при определении функций, тип возвращаемого значения может быть опущен в сигнатуре, но даже если у нас нет параметров, нам все равно нужны скобки.

Опять же, это самая простая реализация метода execute: она ничего не делает, мы только пытаемся сделать код компилируемым. Давайте проверим снова:

$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`
 --> src/lib.rs:4:16
  |
4 |     pub fn new(size: usize) -> ThreadPool {
  |                ^^^^
  |
  = note: #[warn(unused_variables)] on by default
  = note: to avoid this warning, consider using `_size` instead

warning: unused variable: `f`
 --> src/lib.rs:8:30
  |
8 |     pub fn execute<F>(&self, f: F)
  |                              ^
  |
  = note: to avoid this warning, consider using `_f` instead

Сейчас мы получаем только предупреждения, что означает, что код компилируется! Но обратите внимание, если вы попробуете cargo run и сделаете запрос в браузере, вы увидите ошибки в браузере, которые мы видели в начале главы. Наша библиотека на самом деле ещё не вызывает замыкание, переданное в execute!

Примечание: вы возможно слышали высказывание о языках со строгими компиляторами, таких как Haskell и Rust, которое звучит так: «Если код компилируется, то он работает». Но это высказывание не всегда верно. Наш проект компилируется, но абсолютно ничего не делает! Если бы мы создавали реальный, законченный проект, это был бы хороший момент начать писать модульные тесты, чтобы проверять, что код компилируется и имеет желаемое поведение.

Проверка количества потоков в new

Мы продолжим получать предупреждения, потому что мы ничего не делаем с параметрами для new и execute. Давайте реализуем тела этих функций в соответствии с желаемым поведением. Для начала давайте подумаем о new. Ранее мы выбирали без знаковый тип для параметра size, потому что пул с отрицательным числом потоков не имеет смысла. Тем не менее, пул с нулевым значением для потоков также не имеет смысла, но ноль является совершенно корректным для типа usize. Мы добавим код, чтобы проверить, что size больше нуля, перед возвращением экземпляра ThreadPool и будем паниковать, если программа получит ноль, используя макрос assert!, как показано в листинге 20-13.

Файл: src/lib.rs


#![allow(unused)]
fn main() {
pub struct ThreadPool;
impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
}
}

Листинг 20-13: Реализация ThreadPool::new с паникой, если size равен нулю

Мы добавили документации в ThreadPool с помощью комментариев. Обратите внимание, мы следовали хорошим практикам документирования, добавив раздел, в котором указывается ситуация при которой функция может паниковать как обсуждалось в главе 14. Попробуйте запустить cargo doc --open и кликнуть структуру ThreadPool, чтобы увидеть как выглядит сгенерированная документация для new!

Вместо добавления макроса assert!, как мы здесь сделали, мы могли бы описать у new возвращать Result как мы делали в Config::new проекта ввода/вывода в коде 12-9. Но сейчас мы решили, что попытка создания пула потоков без любого указания количества потоков должно быть не восстанавливаемой ошибкой. Если вы чувствуете себя честолюбивым, попробуйте написать версию new со следующей сигнатурой, чтобы сравнить обе версии:

pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {

Создание места для хранения потоков

Теперь у нас есть способ узнать, что задано допустимое число потоков для хранения в пуле и мы можем создать эти потоки и сохранить их в структуре ThreadPool перед её возвратом. Но как мы "храним" поток? Давайте ещё раз посмотрим на сигнатуру thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static

Функция spawn возвращает тип JoinHandle<T>, где T является типом, который возвращает замыкание. Давайте попробуем использовать JoinHandle и посмотрим, что произойдёт. В нашем случае замыкания, которые мы передаём пулу потоков, будут обрабатывать соединение и ничего не будут возвращать, поэтому T будет единичным типом ().

Листинг 20-14 скомпилируется, но пока не создаёт потоков. Мы изменили объявление ThreadPool, чтобы оно содержало вектор экземпляров thread::JoinHandle<()>, инициализировали вектор с размером size, установили цикл for, который будет запускать некоторый код для создания потоков и вернули экземпляр ThreadPool содержащий потоки.

Файл: src/lib.rs

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool {
            threads
        }
    }

    // --snip--
}

Листинг 20-14: Создание вектора в ThreadPool для сохранения потоков

Мы добавили std::thread в область видимости библиотечного крейта, потому что мы используем thread::JoinHandle в качестве типа элементов вектора в ThreadPool.

После получения корректного значения size, наш ThreadPool создаёт новый вектор, который может содержать size элементов. В этой книге мы ещё не использовали функцию with_capacity, которая выполняет ту же задачу что и Vec::new, но с важным отличием: она заранее выделяет указанную память в векторе. Поскольку мы знаем, что нам нужно хранить size элементов в векторе, выполнение этого выделения немного более эффективно, чем использование Vec::new, который изменяет размеры при вставке элементов.

Когда вы снова запустите cargo check, вы получите ещё несколько предупреждений, но все должно завершится успехом.

Структура Worker ответственная за отправку кода из ThreadPool в поток

Мы оставили комментарий относительно создания потоков в цикле for кода 20-14. Здесь мы рассмотрим, как мы на самом деле создаём потоки. Стандартная библиотека предоставляет thread::spawn как способ создания потоков, а thread::spawn ожидает получить некоторый код, который поток должен запустить как только поток создан. Однако в нашем случае мы хотим создать потоки и заставить их ждать код, который мы отправим им позже. Реализация потоков в стандартной библиотеке не имеет какого то способа это сделать, мы должны реализовать это вручную.

Мы будем реализовывать это поведение с помощью новой структуры данных между ThreadPool и потоками, которая будет управлять этим новым поведением. Мы назовём эту структуру данных Worker, что является общим термином в реализации пулов. Подумайте о людях, работающих на кухне в ресторане: рабочие ждут пока не поступят заказы от клиентов, а затем они несут ответственность за принятие этих заказов и их выполнение.

Вместо хранения вектора JoinHandle<()> в пуле потоков, мы будем сохранять экземпляры структуры Worker. Каждый Worker будет хранить один экземпляр JoinHandle<()>. Затем мы реализуем метод у Worker, который берет код замыкания для запуска и отправляет его в уже запущенный поток для выполнения. Мы также назначим каждому работнику id, чтобы мы могли различать разных работников в пуле при ведении журнала или отладке.

Давайте внесём следующие изменения в то, что происходит при создании ThreadPool. Мы реализуем код, который отправляет замыкание в поток после того, как мы настроили Worker следующим образом:

  1. Определим структуру Worker (работник), которая содержит id и JoinHandle<()>.
  2. Изменим ThreadPool, чтобы он содержал вектор экземпляров Worker.
  3. Определим функцию Worker::new, которая принимает номер id и возвращает экземпляр Worker, который содержит id и поток, порождённый пустым замыканием.
  4. В ThreadPool::new используем счётчик цикла for для генерации id, создаём новый Worker с этим id и сохраняем экземпляр "работника" в векторе.

Если вы готовы принять вызов, попробуйте реализовать эти изменения самостоятельно, прежде чем смотреть код листинге 20-15.

Готовы? Вот листинг 20-15 с одним из способов сделать предыдущие модификации.

Файл: src/lib.rs


#![allow(unused)]
fn main() {
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool {
            workers
        }
    }
    // --snip--
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker {
            id,
            thread,
        }
    }
}
}

Листинг 20-15: Изменение ThreadPool для хранения экземпляров Worker вместо непосредственного хранения потоков

Мы изменили имя поля в ThreadPool с threads на workers, потому что теперь оно содержит экземпляры Worker вместо экземпляров JoinHandle<()>. Мы используем счётчик в цикле for в качестве аргумента для Worker::new и сохраняем каждый новый Worker в векторе с именем workers.

Внешний код (вроде нашего сервера в src/bin/main.rs) не должен знать подробности реализации касательно использования структуры Worker внутри ThreadPool, поэтому мы делаем структуру Worker и её новую функцию new приватными. Функция Worker::new использует заданный нами id и сохраняет экземпляр JoinHandle<()>, который создаётся путём порождение нового потока с пустым замыканием.

Этот код скомпилируется и будет хранить количество экземпляров Worker, которое мы указали в качестве аргумента ThreadPool::new. Но мы все ещё не обрабатываем замыкание, которое мы получаем в метод execute. Давайте далее посмотрим, как это сделать.

Отправка запросов в потоки через каналы

Теперь мы рассмотрим проблему, заключающуюся в том, что замыкания переданные в thread::spawn абсолютно ничего не делают. В настоящее время мы получаем замыкание, которое хотим выполнить в методе execute. Но для запуска нам нужно отправить замыкание в thread::spawn, когда мы создаём каждый Worker во время создания ThreadPool.

Мы хотим, чтобы только что созданные структуры Worker извлекали код для запуска из очереди хранящейся в ThreadPool и отправляли этот код в свой поток для выполнения.

В главе 16 вы узнали о каналах (channels) - простом способе связи между двумя потоками, который идеально подойдёт для этого сценария. Мы будем использовать канал в качестве очереди заданий, а команда execute отправит задание из ThreadPool экземплярам Worker, который отправит задание в свой поток. Вот план:

  1. ThreadPool создаст канал и держит его передающую сторону.
  2. Каждый Worker будет удерживать принимающую сторону канала.
  3. Мы создадим новую структуру Job которая будет содержать замыкания, которые мы хотим отправить в канал.
  4. Метод execute отправит задание, которое он хочет выполнить, в отправляющую сторону канала.
  5. В своём потоке Worker будет выполнять цикл с принимающей стороной канала и выполнит замыкание любого получаемого задания.

Давайте начнём с создания канала в ThreadPool::new и удержания отправляющей стороны в экземпляре ThreadPool, как показано в листинге 20-16. В структуре Job сейчас ничего не содержится, но это будет тип элемента который мы отправляем в канал.

Файл: src/lib.rs


#![allow(unused)]
fn main() {
use std::thread;
// --snip--
use std::sync::mpsc;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool {
            workers,
            sender,
        }
    }
    // --snip--
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker {
            id,
            thread,
        }
    }
}
}

Листинг 20-16: Модификация ThreadPool для хранения отправляющей части канала, который отправляет экземпляры Job

В ThreadPool::new мы создаём наш новый канал и пул содержащий отправляющую сторону. Код успешно скомпилируется все ещё с предупреждениями.

Давайте попробуем передавать принимающую сторону канала каждому "работнику" (структуре woker), когда пул потоков создаёт канал. Мы знаем, что хотим использовать получающую часть канала в потоке порождаемым "работником", поэтому мы будем ссылаться на параметр receiver в замыкании. Код 20-17 пока не компилируется.

Файл: src/lib.rs

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool {
            workers,
            sender,
        }
    }
    // --snip--
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker {
            id,
            thread,
        }
    }
}

Листинг 20-17: Передача принимающей части канала "работнику"

Мы внесли несколько небольших и простых изменений: мы передаём принимающую часть канала в Worker::new, а затем используем его внутри замыкания.

При попытке проверить код, мы получаем ошибку:

$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:27:42
   |
27 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here in
   previous iteration of loop
   |
   = note: move occurs because `receiver` has type
   `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait

Код пытается передать receiver в несколько экземпляров Worker. Это не будет работать, как вы помните из главы 16: реализация канала предоставляемая Rust, является моделью несколько производителей (multiple producer), один потребитель (single consumer). Это означает, что мы не можем просто клонировать принимающую часть канала для исправления этого кода. Даже если бы мы это могли, это не техника которую мы хотели бы использовать; вместо этого мы хотим распределить задачи среди потоков, разделяя один receiver среди всех "работников".

Кроме того, удаление задачи из очереди канала включает изменение receiver, поэтому потокам необходим безопасный способ делиться и изменять receiver, в противном случае мы можем получить условия гонки (как описано в главе 16).

Вспомните умные указатели, которые обсуждались в главе 16: чтобы делиться владением между несколькими потоками и позволить потокам изменять значение, нам нужно использовать тип Arc<Mutex<T>>. Тип Arc позволит нескольким "работникам" владеть получателем (receiver), а Mutex гарантирует что только один "работник" получит задание (job) от получателя в один момент времени. Листинг 20-18 показывает изменения, которые мы должны сделать.

Файл: src/lib.rs


#![allow(unused)]
fn main() {
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}
struct Job;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender,
        }
    }

    // --snip--
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
           receiver;
        });

        Worker {
            id,
            thread,
        }
    }
}
}

Листинг 20-18: Совместное использование принимающей стороны канала среди "работников" используя Arc и Mutex

В ThreadPool::new мы помещаем принимающую сторону канала внутрь Arc и Mutex. Для каждого нового "работника" мы клонируем Arc, чтобы увеличить счётчик ссылок так, что "работники" могут разделять владение принимающей стороны канала.

С этими изменениями код компилируется! Мы подбираемся к цели!

Реализация метода execute

Давайте реализуем метод execute у структуры ThreadPool. Мы также изменим тип Job со структуры на псевдоним типа для типаж-объекта, который содержит тип замыкания принимаемый методом execute. Как описано в разделе "Создание синонимов типа с помощью псевдонимов типа" главы 19, псевдонимы типов позволяют делать длинные типы короче. Посмотрите в листинг 20-19.

Файл: src/lib.rs


#![allow(unused)]
fn main() {
// --snip--
pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}
use std::sync::mpsc;
struct Worker {}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--
}

Листинг 20-19: Создание псевдонима типа Job для Box, содержащего каждое замыкание и затем отправляющее задание (job) в канал

После создания нового экземпляра Job с помощью замыкания, получаемого в метод execute, мы отправляем это задание в отправляющую часть канала. Мы вызываем unwrap для send в случае неудачной отправки. Это может произойти, если например, мы остановим выполнение всех наших потоков, что означает, что принимающая сторона прекратила получение новых сообщений. На данный момент мы не можем остановить выполнение наших потоков: наши потоки продолжают выполняться, пока существует пул. Причина, по которой мы используем unwrap, заключается в том, что мы знаем, что сбоя не произойдёт, но компилятор этого не знает.

Но мы ещё не закончили! В "работнике" (worker) наше замыкание, переданное в thread::spawn все ещё ссылается только на принимающую сторону канала. Вместо этого нам нужно, чтобы замыкание работало в бесконечном цикле, запрашивая задание у принимающей части канала и выполняя задание, когда оно принято. Давайте внесём изменения, показанные в листинге 20-20 внутри Worker::new.

Файл: src/lib.rs

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {} got a job; executing.", id);

                (*job)();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

Листинг 20-20: Получение и выполнение заданий в потоке "работника"

Здесь мы сначала вызываем lock у receiver, чтобы получить мьютекс, а затем вызываем unwrap для паники при любых ошибках. Захват блокировки может завершиться неудачей, если мьютекс находится в отравленном state (poisoned state), что может произойти если какой-то другой поток запаниковал, удерживая блокировку, вместо снятия блокировки. В этой ситуации правильное действие - вызвать unwrap для паники потока. Не стесняйтесь заменить unwrap на expect с сообщением об ошибке, которое имеет для вас значение.

Если мы получим блокировку мьютекса, мы вызываем recv для получения Job из канала. Окончательный вызов unwrap проходит мимо любых ошибок, которые могут произойти, если поток удерживающий отправляющую сторону канала, завершил работу подобно тому, как метод send возвращает Err, если принимающая сторона закрывается.

Вызов recv блокирующий, поэтому если ещё нет задач (job), то текущий поток будет ждать, пока задача не станет доступной. Mutex<T> гарантирует, что только один поток Worker пытается запросить задачу за раз.

Теоретически этот код должен компилироваться. К сожалению, компилятор Rust ещё не совершенен и мы получаем ошибку:

error[E0161]: cannot move a value of type std::ops::FnOnce() +
std::marker::Send: the size of std::ops::FnOnce() + std::marker::Send cannot be
statically determined
  --> src/lib.rs:63:17
   |
63 |                 (*job)();
   |                 ^^^^^^

Эта ошибка довольно скрытая, потому что проблема тоже довольно скрытая. Чтобы вызвать замыкание FnOnce сохранённое в Box<T> (то есть псевдоним типа Job), замыкание должно переместить себя наружу из Box<T>, потому что замыкание получает владение над self, когда мы его вызываем. В общем, Rust не позволяет нам перемещать значение из Box<T>, потому что Rust не знает, насколько большим будет значение внутри Box<T>. Вспомните в главе 15, что мы использовали Box<T> именно потому, что у нас было что-то неизвестного размера, которое мы хотели сохранить в Box<T>, чтобы получить значение известного размера.

Как вы видели в коде 17-15, мы можем написать методы, использующие синтаксис self: Box<Self>, который позволяет методу владеть значением Self хранящимся в Box<T>. Это именно то, что мы хотим сделать здесь, но к сожалению, Rust не позволит нам: та часть Rust, которая реализует поведение при вызове замыкания, не реализована с помощью self: Box<Self>. Таким образом, Rust ещё не понимает, что он может использовать self: Box<Self> в такой ситуации, чтобы забрать замыкание во владение и переместить замыкание из Box<T>.

Команда Rust все ещё находится в процессе разработки мест, где компилятор может быть улучшен, но в будущем листинг 20-20 должен работать отлично. Люди, подобные вам, работают над решением этой и других проблем! После того, как вы закончите эту книгу, мы будем рады, если вы присоединитесь.

Но сейчас давайте обойдём эту проблему, используя удобный приём. Мы можем явно сказать Rust, что в этом случае мы можем забрать владение значением внутри типа Box<T>, используя self: Box<Self>. Затем, как только замыкание в нашем владении, его можно вызвать. Приём включает в себя объявление нового типажа FnBox у метода call_box, который будет использовать self: Box<Self> в сигнатуре, определяя FnBox для любого типа, реализующего FnOnce(). Также он включает изменение псевдонима нашего типа для использования нового типажа и изменение Worker для использования метода call_box. Эти изменения показаны в листинге 20-21.

Файл: src/lib.rs

trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}

type Job = Box<dyn FnBox + Send + 'static>;

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {} got a job; executing.", id);

                job.call_box();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

Листинг 20-21: Добавление нового типажа FnBox для обхода текущих ограничений Box<FnOnce()>

Сначала мы создаём новый типаж с именем FnBox. Этот типаж имеет один метод call_box, который аналогичен методам call для других типажей Fn* за исключением того, где требуется self: Box<Self> чтобы стать владельцем self и переместить значение наружу из Box<T>.

Затем мы реализуем типаж FnBox для любого типа F, который реализует типаж FnOnce(). Фактически это означает, что любые замыкания FnOnce() могут использовать наш метод call_box. Реализация call_box использует (*self)() для перемещения замыкания наружу из Box<T> и вызова этого замыкания.

Теперь нам нужно, чтобы псевдоним типа Job был бы Box для всего, что реализует наш новый типаж FnBox. Это позволит нам использовать call_box в Worker, когда мы получим значение Job вместо прямого вызова замыкания. Реализация типажа FnBox для любого замыкания FnOnce() означает, что нам не нужно ничего менять в фактических значениях, которые мы отправляем в канал. Теперь Rust может признать корректным все что мы хотим сделать.

Этот трюк довольно хитрый и сложный. Не волнуйтесь, если это не имеет смысла; когда-нибудь это будет совершенно ненужным.

После реализации этого трюка наш пул потоков находится в рабочем состоянии! Выполните cargo run и сделайте несколько запросов:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never used: `workers`
 --> src/lib.rs:7:5
  |
7 |     workers: Vec<Worker>,
  |     ^^^^^^^^^^^^^^^^^^^^
  |
  = note: #[warn(dead_code)] on by default

warning: field is never used: `id`
  --> src/lib.rs:61:5
   |
61 |     id: usize,
   |     ^^^^^^^^^
   |
   = note: #[warn(dead_code)] on by default

warning: field is never used: `thread`
  --> src/lib.rs:62:5
   |
62 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: #[warn(dead_code)] on by default

    Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

Успех! Теперь у нас есть пул потоков, который обрабатывает соединения асинхронно. Никогда не создаётся более четырёх потоков, поэтому наша система не будет перегружена, если сервер получает много запросов. Если мы отправим запрос ресурса /sleep, сервер сможет обслуживать другие запросы, запустив их в другом потоке.

Примечание: если вы запрашиваете /sleep в нескольких окнах браузера одновременно, они могут загружаться по одному с интервалами в 5 секунд. Некоторые веб-браузеры выполняют несколько экземпляров одного и того же запроса последовательно из-за кэширования. Данное ограничение не вызвано нашим веб-сервером.

После изучения цикла while let в главе 18 вы можете удивиться, почему мы не написали код рабочего потока (worker thread), как показано в листинге 20-22.

Файл: src/lib.rs

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {} got a job; executing.", id);

                job.call_box();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

Листинг 20-22: Альтернативная реализация Worker::new с использованием while let

Этот код компилируется и выполняется, но не приводит к желаемому поведению потоков: медленный запрос все равно заставляет другие запросы ждать обработки. Причина отчасти неуловима: структура Mutex не имеет публичного метода unlock, поскольку владение блокировкой основано на времени жизни MutexGuard<T> внутри LockResult<MutexGuard<T>>, который возвращает метод lock. Во время компиляции анализатор заимствований может применить правило, согласно которому к ресурсу, защищённому Mutex, нельзя получить доступ, если мы не удерживаем блокировку. Но эта реализация также может привести к тому, что блокировка будет удерживаться дольше, чем предполагалось, если мы не будем тщательно продумывать время жизни MutexGuard<T>. Поскольку значения в выражении while остаются в области видимости всего блока, блокировка сохраняется в течение всего вызова job.call_box(), что означает что другие рабочие потоки не могут получить задачи.

Вместо использования loop и получения блокировки и задания внутри блока, а не за его пределами, удаляется экземпляр MutexGuard возвращённый методом lock, как только завершится оператор let job. Это гарантирует, что блокировка удерживается во время вызова recv, но она освобождается до вызова job.call_box(), что позволяет одновременно обслуживать несколько запросов.