Использование потоков для одновременного выполнения кода

В большинстве современных операционных систем, выполняющийся кода программы происходит в процессе и операционная система управляет множеством процессов одновременно. Внутри программы, также можно иметь независимые части выполняющиеся одновременно. Функционал позволяющий запускать эти независимые части называется потоками.

Разделение вычислений в программе на несколько потоков может повысить производительность, поскольку программа выполняет несколько задач одновременно, но это также добавляет сложности. Поскольку потоки могут работать одновременно то, нет внутренней гарантии порядка в котором будут выполняться части вашего кода в разных потоках. Это может привести к таким проблемам, как:

  • Состояние гонок, когда потоки обращаются к данным или ресурсам в несогласованном порядке
  • Взаимные блокировки, когда два потока ожидают друг друга для завершение использования ресурса занятного другим потоком, препятствуя продолжению работы обоих потоков
  • Дефекты, возникающие только в определённых ситуациях, которые трудно воспроизвести и надёжно исправить

Rust пытается смягчить негативные последствия использования потоков, но программирование в многопоточном контексте все ещё требует тщательного обдумывания структуры кода, которая отличается от структуры кода программ, работающих в одном потоке.

Языки программирования реализуют потоки несколькими различными способами. Многие операционные системы предоставляют API для создания новых потоков. Эта модель, где язык вызывает API операционной системы для создания потоков, иногда называется 1:1, что означает один поток операционной системы соответствует одному потоку в языке.

Многие языки программирования предоставляют свои собственные специальные реализации для потоков. Потоки, предоставляемые языком программирования, называются зелёными (green) потоками и языки использующие зелёные потоки, будут выполнять их в контексте другого числа потоков операционной системы. По этой причине модель с зелёными потоками называется моделью M:N: где на N потоков операционной системы приходится M зелёных потоков, а числа M и N не обязательно одинаковые.

У каждой модели есть свои преимущества и компромиссы и самым важным компромиссом для Rust является поддержка среды выполнения. Среда выполнения - это запутывающий термин, который может иметь разное значение в разных контекстах.

Под средой выполнения мы подразумеваем код, который включён языком в каждый двоичный файл. Этот код может быть большим или маленьким в зависимости от языка, но каждый не ассемблерный язык будет иметь некоторое количество кода для среды выполнения. По этой причине в разговорной речи, когда люди говорят, что у языка нет «среды выполнения», это часто означает что всё же есть какая-то «маленькая среда выполнения». Меньшие среды выполнения имеют меньше возможностей, но имеют преимущество, заключающееся в меньших двоичных файлах, которые облегчают объединение языка с другими языками в большем количестве контекстов. Многие языки программирования лояльны к увеличению размера среды их выполнения: в обмен они получают дополнительные функции. Rust же проектируется таким образом, чтобы практически не иметь среды выполнения. Это приводит к невозможности вызова другого компилируемого языка, например C, для улучшения собственной производительности.

Модель "зелёных" потоков M:N требует большей среды выполнения для управления потоками. Таким образом, стандартная библиотека Rust обеспечивает реализацию потоков только 1:1. Поскольку Rust - это низкоуровневый язык где существуют, то в сообществе были сделаны крейты которые реализуют многопоточность M:N. Если вы предпочитаете тратить накладные расходы на такие аспекты, как больший контроль над тем, какие потоки запускаются; иметь более низкую стоимостью переключения контекста, то подобные крейты - ваш выбор.

Теперь, когда мы дали определение потокам в Rust, давайте рассмотрим, как использовать API, связанное с потоками из стандартной библиотекой.

Создание нового поток с помощью spawn

Чтобы создать новый поток, мы вызываем функцию thread::spawn и передаём ей замыкание (мы говорили о замыканиях в главе 13), содержащее код, который мы хотим запустить в новом потоке. Пример в листинге 16-1 печатает некоторый текст из нового потока и другой текст из основного потока:

Файл: src/main.rs

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

Листинг 16-1: Создание нового потока для печати в отдельном потоке чего-либо во время печати в главном потоке чего-то другого

Обратите внимание, что с помощью этой функции новый поток будет остановлен по окончании основного потока, независимо от того, завершился он или нет. Вывод этой программы может каждый раз немного отличаться, но он будет выглядеть примерно так:

hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!

Вызовы thread::sleep заставляют поток на короткое время останавливать своё выполнение, позволяя выполняться другим потокам. Очерёдность выполнения потоков вероятно будет меняться, но это не гарантировано: это зависит от того, как ваша операционная система планирует потоки. В этом цикле основной поток печатает первым, не смотря на то, что оператор печати из порождённого потока появляется раньше в коде. И даже несмотря на то, что мы написали код, что порождённый поток должен печатать до тех пор, пока значение i не достигнет числа 9, оно дошло только до 5, перед тем как основной поток закрылся.

Если вы запустите этот код и увидите вывод только из основного потока или не видите печати из других потоков, попробуйте увеличить числа в диапазонах, чтобы дать операционной системе больше возможностей для переключения между потоками.

Ожидание завершения работы всех потоков используя join

Код в листинге 16-1 не только преждевременно останавливает порождённый поток из-за окончания основного потока, но также не может гарантировать, что порождённый поток вообще запустится. Причина в том, что нет никакой гарантии относительно порядка выполнения потоков!

Мы можем исправить проблему того, что порождённый поток не запускается или не запускается полностью, сохранением возвращаемого значения thread::spawn в переменной. Возвращаемым из метода thread::spawn типом является JoinHandle. JoinHandle - это собственное значение такое, что когда мы вызываем у него метод join, оно будет ожидать завершения этого потока. В листинге 16-2 показано как использовать JoinHandle потока, который мы создали в листинге 16-1 и вызвать у него join, чтобы убедиться, что порождённый поток завершает работу до выхода из main:

Файл: src/main.rs

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

Листинг 16-2: Сохранение значения JoinHandle из thread::spawn для гарантированного ожидания завершения работы потока

Вызов join у дескриптора блокирует текущий поток, пока поток, представленный дескриптором не завершится. Блокировка потока означает, что потоку запрещено выполнять работу или выходить из него. Поскольку мы поместили вызов join после цикла for основного потока, выполнение листинга 16-2 должно привести к выводу, подобному следующему:

hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 1 from the spawned thread!
hi number 3 from the main thread!
hi number 2 from the spawned thread!
hi number 4 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!

Два потока продолжают чередоваться, но основной поток находится в ожидании из-за вызова handle.join() и не завершается до тех пор, пока не завершится запущенный поток.

Но давайте посмотрим, что произойдёт, если мы вместо этого переместим handle.join() перед циклом for в main, например так:

Файл: src/main.rs

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    handle.join().unwrap();

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

Основной поток будет ждать завершения порождённого потока, а затем запустит свой цикл for , поэтому выходные данные больше не будут чередоваться, как показано ниже:

hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!

Небольшие детали, такие как где вызывается join, могут повлиять на то, выполняются ли ваши потоки одновременно.

Использование move-замыканий в потоках

Замыкание move часто используется вместе с thread::spawn, потому что оно позволяет использовать данные из одного потока в другом потоке.

В главе 13 мы упоминали, что мы можем использовать ключевое слово move перед списком параметров замыкания, чтобы заставить замыкание принимать во владение значения, которые оно использует в окружении. Эта методика особенно полезна при создании новых потоков для передачи прав владения на значения из одного потока в другой.

Обратите внимание, что в листинге 16-1 замыкание, которое мы передаём в thread::spawn не принимает аргументов: мы не используем никаких данных из основного потока в коде порождённого потока. Чтобы использовать данные из основного потока в порождённом потоке, замыкание порождённого потока должно захватывать значения, которые ему необходимы. Листинг 16-3 показывает попытку создать вектор в главном потоке и использовать его в порождённом потоке. Тем не менее, это не будет работать, как вы увидите через мгновение.

Файл: src/main.rs

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

Листинг 16-3: Попытка использования вектора в дочернем потоке, созданного в основном потоке

Замыкание использует переменную v, поэтому оно захватит v и сделает его частью окружения замыкания. Поскольку thread::spawn запускает это замыкание в новом потоке, мы должны иметь доступ к v внутри этого нового потока. Но при компиляции этого примера, мы получаем следующую ошибку:

$ cargo run
   Compiling threads v0.1.0 (file:///projects/threads)
error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
 --> src/main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("Here's a vector: {:?}", v);
  |                                           - `v` is borrowed here
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:6:18
  |
6 |       let handle = thread::spawn(|| {
  |  __________________^
7 | |         println!("Here's a vector: {:?}", v);
8 | |     });
  | |______^
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ^^^^^^^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0373`.
error: could not compile `threads`

To learn more, run the command again with --verbose.

Rust выводит как захватить v и так как в println! нужна только ссылка на v, то замыкание пытается заимствовать v. Однако есть проблема: Rust не может определить, как долго будет работать порождённый поток, поэтому он не знает, будет ли всегда действительной ссылка на v.

В листинге 16-4 приведён сценарий, который с большей вероятностью будет иметь ссылку на v, что будет недопустимо:

Файл: src/main.rs

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    drop(v); // oh no!

    handle.join().unwrap();
}

Листинг 16-4: Поток с замыканием, которое пытается захватить ссылку на v из главного потока, который удаляет v

Если бы нам разрешили запустить этот код, была бы вероятность что порождённый поток был бы немедленно помещён в фоновый режим, вообще без его запуска. Внутри порождённого потока есть ссылка на v, но основной поток немедленно удаляет v используя функцию drop, о которой мы говорили в главе 15. Затем когда порождённый поток начинает выполняться то v больше не действительна и поэтому ссылка на него также недействительным. О нет!

Чтобы исправить ошибку компилятора в листинге 16-3, мы можем использовать совет из сообщения об ошибке:

help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ^^^^^^^

Добавляя ключевое слово move перед замыканием, мы заставляем замыкание забирать используемые значения во владение, вместо того, чтобы позволить Rust вывести необходимость заимствования значения. Модификация Листинга 16-3, показанная в Листинге 16-5, будет скомпилирована и запущена так, как мы ожидаем:

Файл: src/main.rs

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

Листинг 16-5: Использование ключевого слова move для принуждения замыкания забрать во владение используемых им значений

Что произойдёт с кодом в листинге 16-4, где основной поток вызывает drop при использовании в замыкании move? Исправит ли move этот случай? К сожалению нет; мы получили бы другую ошибку, потому что то, что пытается сделать листинг 16-4 не разрешено по другой причине. Если бы мы добавили move в замыкание, мы бы переместили v в среду замыкания и больше не могли вызывать drop для него в главном потоке. Вместо этого мы получили бы ошибку компилятора:

$ cargo run
   Compiling threads v0.1.0 (file:///projects/threads)
error[E0382]: use of moved value: `v`
  --> src/main.rs:10:10
   |
4  |     let v = vec![1, 2, 3];
   |         - move occurs because `v` has type `Vec<i32>`, which does not implement the `Copy` trait
5  | 
6  |     let handle = thread::spawn(move || {
   |                                ------- value moved into closure here
7  |         println!("Here's a vector: {:?}", v);
   |                                           - variable moved due to use in closure
...
10 |     drop(v); // oh no!
   |          ^ value used here after move

error: aborting due to previous error

For more information about this error, try `rustc --explain E0382`.
error: could not compile `threads`

To learn more, run the command again with --verbose.

Правила владения Rust снова нас спасли! Мы получили ошибку кода из листинга 16-3, потому что Rust был консервативен и только заимствовал v для потока, что означало, что основной поток теоретически может сделать недействительной ссылку на порождённый поток. Сообщив Rust о передаче владения v в порождаемый поток, мы гарантируем Rust, что основной поток больше не будет использовать v. Если мы изменим Листинг 16-4 таким же образом, то мы нарушаем правила владения при попытке использовать v в главном потоке. Ключевое слово move отменяет основное консервативное поведение Rust по заимствованию, что не позволяет нам нарушать правила владения.

Имея базовое понимание потоков и API потоков, давайте посмотрим, что мы можем делать с помощью потоков.