Многопоточное разделяемое состояние

Передача сообщений - это прекрасный способ многопоточной работы, но он не является единственным. Вспомните часть лозунга из документации на языке Go: «не общайтесь, разделяя память».

Как бы выглядело общение, используя разделяемую память? Кроме того, почему энтузиасты передачи сообщений не используют его и делают наоборот?

В каком-то смысле каналы в любом языке программирования похожи на единоличное владение, потому что после передачи значения по каналу вам больше не следует использовать отправленное значение. Многопоточная, совместно используемая память подобна множественному владению: несколько потоков могут одновременно обращаться к одной и той же области памяти. Как вы видели в главе 15, где умные указатели сделали возможным множественное владение, множественное владение может добавить сложность, потому что нужно управлять этими разными владельцами. Система типов Rust и правила владения очень помогают в их правильном управлении. Для примера давайте рассмотрим мьютексы, один из наиболее распространённых многопоточных примитивов для разделяемой памяти.

Мьютексы предоставляют доступ к данным из одного потока (за раз)

Mutex - это сокращение от взаимное исключение (mutual exclusion), так как мьютекс позволяет только одному потоку получать доступ к некоторым данным в любой момент времени. Для того, чтобы получить доступ к данным в мьютексе, поток должен сначала подать сигнал, что он хочет получить доступ запрашивая блокировку (lock) мьютекса. Блокировка - это структура данных, являющаяся частью мьютекса, которая отслеживает кто в настоящее время имеет эксклюзивный доступ к данным. Поэтому мьютекс описывается как объект защищающий данные, которые он хранит через систему блокировки.

Мьютексы имеют репутацию трудных в использовании, потому что вы должны помнить два правила:

  • Перед тем как попытаться получить доступ к данным необходимо получить блокировку.
  • Когда вы закончили работу с данными, которые защищает мьютекс, вы должны разблокировать данные, чтобы другие потоки могли получить блокировку.

Для понимания мьютекса, представьте пример из жизни как групповое обсуждение на конференции с одним микрофоном. Прежде чем участник дискуссии сможет говорить, он должен спросить или дать сигнал, что он хочет использовать микрофон. Когда он получает микрофон, то может говорить столько, сколько хочет, а затем передаёт микрофон следующему участнику, который попросит дать ему выступить. Если участник дискуссии забудет освободить микрофон, когда закончит с ним, то никто больше не сможет говорить. Если управление общим микрофоном идёт не правильно, то конференция не будет работать как было запланировано!

Правильное управление мьютексами может быть невероятно сложным и именно поэтому многие люди с энтузиазмом относятся к каналам. Однако, благодаря системе типов и правилам владения в Rust, вы не можете использовать блокировку и разблокировку неправильным образом.

Mutex<T> API

Давайте рассмотрим пример использования мьютекса в листинге 16-12 без использования нескольких потоков:

Файл: src/main.rs

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}

Листинг 16-12: Изучение API Mutex для простоты в однопоточном контексте.

Как и во многих типах, мы создаём Mutex<T> используя ассоциированную функцию new. Чтобы получить доступ к данным внутри мьютекса, мы используем метод lock для получения блокировки. Этот вызов блокирует текущий поток, поэтому он не может выполнять какую-либо другую работу, пока не наступит наша очередь получить блокировку.

Вызов lock завершится неудачей, если запаникует другой поток, удерживающий блокировку. В этом случае никто никогда не сможет получить блокировку, поэтому мы решили вызвать unwrap и вызвать панику, если окажемся в такой ситуации.

После того как мы получили блокировку, мы можем рассматривать возвращаемое значение, в данном случае с именем num, как изменяемую ссылку на данные внутри. Система типов гарантирует, что мы получим блокировку перед использованием значения из m: Mutex<i32> не является типом i32, поэтому мы должны получить блокировку, чтобы иметь возможность использовать значение i32. Мы не можем забыть этого сделать; так как система типов не позволит нам получить доступ ко внутреннему i32 значению.

Как вы наверное подозреваете, Mutex<T> является умным указателем. Точнее, вызов lock возвращает умный указатель называемый MutexGuard, обёрнутый в LockResult, который мы обработали с помощью вызова unwrap. Умный указатель типа MutexGuard реализует типаж Deref для указания на внутренние данные; умный указатель также имеет реализацию типажа Drop автоматически снимающего блокировку, когда MutexGuard выходит из области видимости, что происходит в конце внутренней области видимости в листинге 16-12. В результате мы не рискуем забыть снять блокировку и заблокировать мьютекс от использования другими потоками, поскольку снятие блокировки происходит автоматически.

После снятия блокировки можно напечатать значение мьютекса и увидеть, что мы смогли изменить внутреннее i32 на 6.

Разделение Mutex<T> между множеством потоков

Теперь давайте попробуем с помощью Mutex<T> совместно использовать значение между несколькими потоками. Мы стартуем 10 потоков и каждый из них увеличивает значение счётчика на 1, поэтому счётчик изменяется от 0 до 10. Обратите внимание, что в следующих нескольких примерах будут ошибки компилятора и мы будем использовать эти ошибки, чтобы узнать больше об использовании типа Mutex<T> и как Rust помогает нам правильно его использовать. Листинг 16-13 содержит наш начальный пример:

Файл: src/main.rs

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

Листинг 16-13. Десять потоков каждый из которых увеличивает счётчик, защищённый Mutex

Мы создаём переменную счётчик counter для хранения i32 внутри Mutex<T>, как мы это делали в листинге 16-12. Далее мы создаём 10 потоков, перебирая диапазон чисел. Мы используем thread::spawn и передаём всем этим потокам одинаковое замыкание, которое перемещает счётчик в поток, запрашивает блокировку на Mutex<T>, вызывая метод lock, а затем добавляет 1 к значению в мьютексе. Когда поток завершит выполнение своего замыкания, num выйдет из области видимости и освободит блокировку, чтобы другой поток мог её получить.

В основном потоке мы собираем все дескрипторы в переменную handles. Затем, как мы это делали в листинге 16-2, вызываем join для каждого дескриптора, чтобы убедиться в завершении всех потоков. В этот момент основной поток получит доступ к блокировке и тоже напечатает результат программы.

Компилятор намекнул, что этот пример не компилируется. Теперь давайте выясним почему!

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: use of moved value: `counter`
  --> src/main.rs:9:36
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
9  |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^ value moved into closure here, in previous iteration of loop
10 |             let mut num = counter.lock().unwrap();
   |                           ------- use occurs due to use in closure

error: aborting due to previous error

For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state`

To learn more, run the command again with --verbose.

Ага! Первое сообщение об ошибке указывает, что counter перемещён в замыкание для потока, связанного с handle. Этот шаг не позволяет нам захватить counter, когда мы пытаемся вызвать lock и сохранить результат в num2 во втором потоке! Так что Rust говорит нам, что мы не можем передать во владение counter в несколько потоков. Это было трудно увидеть ранее, потому что наши потоки были в цикле и Rust не может указывать на разные потоки в разных итерациях цикла. Давайте исправим ошибку компилятора с помощью метода множественного владения, который мы обсуждали в главе 15.

Множественное владение между множеством потоков

В главе 15 мы давали значение нескольким владельцам, используя умный указатель Rc<T> для создания значения подсчитанных ссылок. Давайте сделаем то же самое здесь и посмотрим, что произойдёт. Мы завернём Mutex<T> в Rc<T> в листинге 16-14 и клонируем Rc<T> перед передачей владения в поток. Теперь, когда мы увидели ошибки, мы также вернёмся к использованию цикла for и сохраним ключевое слово move у замыкания.

Файл: src/main.rs

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

Листинг 16-14: Попытка использования Rc, чтобы позволить нескольким потокам владение Mutex

Ещё раз, мы компилируем и получаем ... другие ошибки! Компилятор учит нас.

error[E0277]: the trait bound `std::rc::Rc<std::sync::Mutex<i32>>:
std::marker::Send` is not satisfied in `[closure@src/main.rs:11:36:
15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`
  --> src/main.rs:11:22
   |
11 |         let handle = thread::spawn(move || {
   |                      ^^^^^^^^^^^^^ `std::rc::Rc<std::sync::Mutex<i32>>`
cannot be sent between threads safely
   |
   = help: within `[closure@src/main.rs:11:36: 15:10
counter:std::rc::Rc<std::sync::Mutex<i32>>]`, the trait `std::marker::Send` is
not implemented for `std::rc::Rc<std::sync::Mutex<i32>>`
   = note: required because it appears within the type
`[closure@src/main.rs:11:36: 15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`
   = note: required by `std::thread::spawn`

Вау, сообщение об ошибке очень многословное! Вот некоторые важные части, на которых нужно сосредоточить внимание: первая встроенная ошибка говорит о том, что ``std::rc::Rc<std::sync::Mutex> cannot be sent between threads safely. Причиной этого является следующая важная часть сообщения об ошибке. Сообщение об ошибке говорит, the trait bound Send is not satisfied. Мы поговорим про типаж Send в следующем разделе: это один из типажей гарантирующих что типы, используемые потоками, предназначены для использования в многопоточных ситуациях.

К сожалению, Rc<T> небезопасен для совместного использования между потоками. Когда Rc<T> управляет счётчиком ссылок, он добавляется значение к счётчику для каждого вызова clone и вычитается значение из счётчика, когда каждое клонированное значение удаляется при выходе из области видимости. Но он не использует примитивы многопоточности, чтобы гарантировать, что изменения в подсчёте не могут быть прерваны другим потоком. Это может привести к неправильным подсчётам - незначительным ошибкам, которые в свою очередь, могут привести к утечкам памяти или удалению значения до того, как мы отработали с ним. Нам нужен тип точно такой же как Rc<T>, но который позволяет изменять счётчик ссылок безопасно из разных потоков.

Атомарный счётчик ссылок Arc<T>

К счастью, Arc<T> является типом аналогичным типу Rc<T>, который безопасен для использования в ситуациях многопоточности. Буква А означает атомарное, что означает тип ссылка подсчитываемая атомарно. Atomics - это дополнительный вид примитивов для многопоточности, который мы не будем здесь подробно описывать: дополнительную информацию смотрите в документации стандартной библиотеки для std::sync::atomic. На данный момент вам просто нужно знать, что atomics работают как примитивные типы, но безопасны для совместного использования между потоками.

Вы можете спросить, почему все примитивные типы не являются атомарными и почему стандартные типы библиотек не реализованы для использования вместе с типом Arc<T> по умолчанию. Причина в том, что безопасность потоков сопровождается снижением производительности, которое вы хотите платить только тогда, когда вам это действительно нужно. Если вы просто выполняете операции со значениями в одном потоке, то ваш код может работать быстрее, если он не должен обеспечивать гарантии предоставляемые atomics.

Давайте вернёмся к нашему примеру: типы Arc<T> и Rc<T> имеют одинаковый API, поэтому мы исправляем нашу программу, заменяя тип в строках use, вызове new и вызове clone. Код в листинге 16-15, наконец скомпилируется и запустится:

Файл: src/main.rs

use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

Листинг 16-15: Использование типа Arc для обёртывания Mutex для возможности совместного владения несколькими потоками

Код напечатает следующее:

Result: 10

Мы сделали это! Мы посчитали от 0 до 10, что может показаться не очень впечатляющим, но это позволило больше узнать про Mutex<T> и безопасность потоков. Вы также можете использовать структуру этой программы для выполнения более сложных операций, чем просто увеличение счётчика. Используя эту стратегию, вы можете разделить вычисления на независимые части, разделить эти части на потоки, а затем использовать Mutex<T>, чтобы каждый поток обновлял конечный результат своей частью кода.

Сходства RefCell<T> / Rc<T> и Mutex<T> / Arc<T>

Вы могли заметить, что counter теперь не изменяемый, но мы могли бы получить изменяемую ссылку на значение внутри него; это означает, что Mutex<T> обеспечивает внутреннюю изменяемость как и семейство Cell типов. Таким же образом мы использовали RefCell<T> в главе 15, чтобы позволить нам изменять содержимое внутри Rc<T>, мы используем Mutex<T> для изменения содержимого внутри Arc<T> .

Ещё одна деталь, на которую стоит обратить внимание: Rust не может защитить вас от всевозможных логических ошибок при использовании Mutex<T>. Вспомните в главе 15, что использование Rc<T> сопряжено с риском создания ссылочной зацикленности, где два значения Rc<T> ссылаются друг на друга, что приводит к утечкам памяти. Аналогичным образом, Mutex<T> сопряжён с риском создания взаимных блокировок (deadlocks). Это происходит, когда операции необходимо заблокировать два ресурса и каждый из двух потоков получил одну из блокировок, заставляя оба потока ждать друг друга вечно. Если вам интересна тема взаимных блокировок, попробуйте создать программу Rust, которая её содержит; затем исследуйте стратегии устранения взаимных блокировок для мьютексов на любом языке и попробуйте реализовать их в Rust. Документация стандартной библиотеки для Mutex<T> и MutexGuard предлагает полезную информацию.

Мы завершим эту главу, рассказав о типажах Send и Sync и о том, как мы можем использовать их с пользовательскими типами.