Использование потоков для одновременного выполнения кода

В большинстве современных операционных систем, выполняющийся кода программы происходит в процессе и операционная система управляет множеством процессов одновременно. Внутри программы, также можно иметь независимые части выполняющиеся одновременно. Функционал позволяющий запускать эти независимые части называется потоками.

Разделение вычислений в программе на несколько потоков может повысить производительность, поскольку программа выполняет несколько задач одновременно, но это также добавляет сложности. Поскольку потоки могут работать одновременно то, нет внутренней гарантии порядка в котором будут выполняться части вашего кода в разных потоках. Это может привести к таким проблемам, как:

  • Состояние гонок, когда потоки обращаются к данным или ресурсам в несогласованном порядке
  • Взаимные блокировки, когда два потока ожидают друг друга для завершение использования ресурса занятного другим потоком, препятствуя продолжению работы обоих потоков
  • Дефекты, возникающие только в определённых ситуациях, которые трудно воспроизвести и надёжно исправить

Rust пытается смягчить негативные последствия использования потоков, но программирование в многопоточном контексте все ещё требует тщательного обдумывания структуры кода, которая отличается от структуры кода программ, работающих в одном потоке.

Языки программирования реализуют потоки несколькими различными способами. Многие операционные системы предоставляют API для создания новых потоков. Эта модель, в которой язык вызывает API операционной системы для создания потоков, иногда называется 1:1 , что означает один поток операционной системы на один языковой поток. Стандартная библиотека Rust обеспечивает только реализацию потоков 1:1; есть крейты, которые реализуют другие модели многопоточности, которые делают другие компромиссы.

Создание нового поток с помощью spawn

Чтобы создать новый поток, мы вызываем функцию thread::spawn и передаём ей замыкание (мы говорили о замыканиях в главе 13), содержащее код, который мы хотим запустить в новом потоке. Пример в листинге 16-1 печатает некоторый текст из нового потока и другой текст из основного потока:

Файл: src/main.rs

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

Листинг 16-1: Создание нового потока для печати в отдельном потоке чего-либо во время печати в главном потоке чего-то другого

Обратите внимание, что с помощью этой функции новый поток будет остановлен по окончании основного потока, независимо от того, завершился он или нет. Вывод этой программы может каждый раз немного отличаться, но он будет выглядеть примерно так:

hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!

Вызовы thread::sleep заставляют поток на короткое время останавливать своё выполнение, позволяя выполняться другим потокам. Очерёдность выполнения потоков вероятно будет меняться, но это не гарантировано: это зависит от того, как ваша операционная система планирует потоки. В этом цикле основной поток печатает первым, не смотря на то, что оператор печати из порождённого потока появляется раньше в коде. И даже несмотря на то, что мы написали код, что порождённый поток должен печатать до тех пор, пока значение i не достигнет числа 9, оно дошло только до 5, перед тем как основной поток закрылся.

Если вы запустите этот код и увидите вывод только из основного потока или не видите печати из других потоков, попробуйте увеличить числа в диапазонах, чтобы дать операционной системе больше возможностей для переключения между потоками.

Ожидание завершения работы всех потоков используя join

Код в листинге 16-1 не только преждевременно останавливает порождённый поток из-за окончания основного потока, но также не может гарантировать, что порождённый поток вообще запустится. Причина в том, что нет никакой гарантии относительно порядка выполнения потоков!

Мы можем исправить проблему того, что порождённый поток не запускается или не запускается полностью, сохранением возвращаемого значения thread::spawn в переменной. Возвращаемым из метода thread::spawn типом является JoinHandle. JoinHandle - это собственное значение такое, что когда мы вызываем у него метод join, оно будет ожидать завершения этого потока. В листинге 16-2 показано как использовать JoinHandle потока, который мы создали в листинге 16-1 и вызвать у него join, чтобы убедиться, что порождённый поток завершает работу до выхода из main:

Файл: src/main.rs

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

Листинг 16-2: Сохранение значения JoinHandle из thread::spawn для гарантированного ожидания завершения работы потока

Вызов join у дескриптора блокирует текущий поток, пока поток, представленный дескриптором не завершится. Блокировка потока означает, что потоку запрещено выполнять работу или выходить из него. Поскольку мы поместили вызов join после цикла for основного потока, выполнение листинга 16-2 должно привести к выводу, подобному следующему:

hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 1 from the spawned thread!
hi number 3 from the main thread!
hi number 2 from the spawned thread!
hi number 4 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!

Два потока продолжают чередоваться, но основной поток находится в ожидании из-за вызова handle.join() и не завершается до тех пор, пока не завершится запущенный поток.

Но давайте посмотрим, что произойдёт, если мы вместо этого переместим handle.join() перед циклом for в main, например так:

Файл: src/main.rs

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    handle.join().unwrap();

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

Основной поток будет ждать завершения порождённого потока, а затем запустит свой цикл for , поэтому выходные данные больше не будут чередоваться, как показано ниже:

hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!

Небольшие детали, такие как где вызывается join, могут повлиять на то, выполняются ли ваши потоки одновременно.

Использование move-замыканий в потоках

Ключевое слово move часто используется с замыканиями, передаваемыми в thread::spawn, потому что замыкание затем получает право собственности на значения, которые оно использует из окружения, тем самым передавая владение этими значениями из одного потока в другой. В разделе “Захват окружения с помощью замыканий” Главы 13 мы обсуждали move в контексте замыканий. Теперь мы больше сосредоточимся на взаимодействии между move и thread::spawn .

Обратите внимание, что в листинге 16-1 замыкание, которое мы передаём в thread::spawn не принимает аргументов: мы не используем никаких данных из основного потока в коде порождённого потока. Чтобы использовать данные из основного потока в порождённом потоке, замыкание порождённого потока должно захватывать значения, которые ему необходимы. Листинг 16-3 показывает попытку создать вектор в главном потоке и использовать его в порождённом потоке. Тем не менее, это не будет работать, как вы увидите через мгновение.

Файл: src/main.rs

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

Листинг 16-3: Попытка использования вектора в дочернем потоке, созданного в основном потоке

Замыкание использует переменную v, поэтому оно захватит v и сделает его частью окружения замыкания. Поскольку thread::spawn запускает это замыкание в новом потоке, мы должны иметь доступ к v внутри этого нового потока. Но при компиляции этого примера, мы получаем следующую ошибку:

$ cargo run
   Compiling threads v0.1.0 (file:///projects/threads)
error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
 --> src/main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("Here's a vector: {:?}", v);
  |                                           - `v` is borrowed here
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:6:18
  |
6 |       let handle = thread::spawn(|| {
  |  __________________^
7 | |         println!("Here's a vector: {:?}", v);
8 | |     });
  | |______^
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ++++

For more information about this error, try `rustc --explain E0373`.
error: could not compile `threads` due to previous error

Rust выводит как захватить v и так как в println! нужна только ссылка на v, то замыкание пытается заимствовать v. Однако есть проблема: Rust не может определить, как долго будет работать порождённый поток, поэтому он не знает, будет ли всегда действительной ссылка на v.

В листинге 16-4 приведён сценарий, который с большей вероятностью будет иметь ссылку на v, что будет недопустимо:

Файл: src/main.rs

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    drop(v); // oh no!

    handle.join().unwrap();
}

Листинг 16-4: Поток с замыканием, которое пытается захватить ссылку на v из главного потока, который удаляет v

Если бы нам разрешили запустить этот код, была бы вероятность что порождённый поток был бы немедленно помещён в фоновый режим, вообще без его запуска. Внутри порождённого потока есть ссылка на v, но основной поток немедленно удаляет v используя функцию drop, о которой мы говорили в главе 15. Затем когда порождённый поток начинает выполняться то v больше не действительна и поэтому ссылка на него также недействительным. О нет!

Чтобы исправить ошибку компилятора в листинге 16-3, мы можем использовать совет из сообщения об ошибке:

help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ++++

Добавляя ключевое слово move перед замыканием, мы заставляем замыкание забирать используемые значения во владение, вместо того, чтобы позволить Rust вывести необходимость заимствования значения. Модификация Листинга 16-3, показанная в Листинге 16-5, будет скомпилирована и запущена так, как мы ожидаем:

Файл: src/main.rs

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

Листинг 16-5: Использование ключевого слова move для принуждения замыкания забрать во владение используемых им значений

Что произойдёт с кодом в листинге 16-4, где основной поток вызывает drop при использовании в замыкании move? Исправит ли move этот случай? К сожалению нет; мы получили бы другую ошибку, потому что то, что пытается сделать листинг 16-4 не разрешено по другой причине. Если бы мы добавили move в замыкание, мы бы переместили v в среду замыкания и больше не могли вызывать drop для него в главном потоке. Вместо этого мы получили бы ошибку компилятора:

$ cargo run
   Compiling threads v0.1.0 (file:///projects/threads)
error[E0382]: use of moved value: `v`
  --> src/main.rs:10:10
   |
4  |     let v = vec![1, 2, 3];
   |         - move occurs because `v` has type `Vec<i32>`, which does not implement the `Copy` trait
5  | 
6  |     let handle = thread::spawn(move || {
   |                                ------- value moved into closure here
7  |         println!("Here's a vector: {:?}", v);
   |                                           - variable moved due to use in closure
...
10 |     drop(v); // oh no!
   |          ^ value used here after move

For more information about this error, try `rustc --explain E0382`.
error: could not compile `threads` due to previous error

Правила владения Rust снова нас спасли! Мы получили ошибку кода из листинга 16-3, потому что Rust был консервативен и только заимствовал v для потока, что означало, что основной поток теоретически может сделать недействительной ссылку на порождённый поток. Сообщив Rust о передаче владения v в порождаемый поток, мы гарантируем Rust, что основной поток больше не будет использовать v. Если мы изменим Листинг 16-4 таким же образом, то мы нарушаем правила владения при попытке использовать v в главном потоке. Ключевое слово move отменяет основное консервативное поведение Rust по заимствованию, что не позволяет нам нарушать правила владения.

Имея базовое понимание потоков и API потоков, давайте посмотрим, что мы можем делать с помощью потоков.