Передача данных с помощью сообщений между потоками

Одним из все более популярных подходов к обеспечению безопасной многопоточности является передача сообщений, когда потоки или участники взаимодействуют друг с другом, отправляя друг другу сообщения, содержащие данные. Вот идея в слогане из документации языка Go: «Не общайтесь разделяя память; делитесь памятью обмениваясь сообщениями».

Одним из основных инструментов Rust для обеспечения многопоточной отправки сообщений является канал (channel), концепция программирования, для которой стандартная библиотека Rust предоставляет реализацию. Вы можете представить канал в контексте программирования как канал воды, как ручей или река. Если вы поместите что-то вроде резиновой утки или лодки в ручей, она пойдёт вниз по течению до конца водного пути.

Канал в программировании имеет две части: передатчик и приёмник. Передаваемая часть - это место вверх по течению, где вы помещаете резиновых уток в реку, а приёмная часть - там, куда резиновая утка приплывает вниз по течению. Одна часть вашего кода вызывает методы у передатчика с данными, которые вы хотите отправить, а другая часть проверяет на принимающей стороне поступающие сообщения. Говорят, что канал закрыт, если передаваемая или приёмная сторона канала закрылась.

Здесь мы будем работать с программой, в которой есть один поток для генерации значений и отправки их в канал и другой поток, который получит значения и распечатает их. Используя канал, мы будем отправлять простые значения между потоками, чтобы проиллюстрировать этот функционал. Как только вы освоитесь с этой техникой, вы можете использовать каналы для реализации чат системы или системы, в которой множество потоков выполняют части вычисления и отправляют вычисленные части в один поток, который объединяет результаты.

Сначала в листинге 16-6 мы создадим канал, но не будем ничего с ним делать. Обратите внимание, что этот код ещё не компилируется, потому что Rust не может сказать, какой тип значений мы хотим отправить через канал.

Файл: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Листинг 16-6: Создание канала и назначение двух половин для tx (передачи) и rx (приёма)

Мы создаём новый канал, используя mpsc::channel функцию; mpsc означает несколько производителей, один потребитель (multiple producer, single consumer). Коротко, способ которым стандартная библиотека Rust реализует каналы, означает что канал может иметь несколько отправляющих источников генерирующих значения, но только одну принимающую сторону, которая потребляет эти значения. Представьте, что несколько потоков втекают в одну большую реку: все, что идёт вниз по любому из потоков, в конце концов окажется в одной реке. Сейчас мы начнём с одного производителя, но добавим несколько производителей, когда этот пример будет работать.

Функция mpsc::channel возвращает кортеж, первый элемент которого является отправляющей стороной, а второй элемент - принимающей стороной. Сокращения tx и rx традиционно используются во многих областях для обозначения передатчика и приёмника соответственно, поэтому мы называем наши переменные таким образом для обозначения каждой стороны. Мы используем оператор let с шаблоном, который разрушает кортеж; мы обсудим использование шаблонов в операторах let и деструктуризацию в главе 18. Использование оператора let таким образом является удобным подходом для извлечения фрагментов кортежа, возвращаемого функцией mpsc::channel.

Давайте переместим передающую часть в порождённый поток так, чтобы он отправлял одну строку и чтобы таким образом, порождённый поток связывался с основным потоком, как показано в листинге 16-7. Это похоже на то, как если бы вы поместили резиновую утку в реку вверх по течению или отправили сообщение чата из одного потока в другой.

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

Листинг 16-7: Перемещение tx в порождённый поток и отправка "hi"

Опять же, мы используем вызов thread::spawn для создания нового потока, а затем с помощью ключевого слова move перемещаем tx в замыкание, чтобы порождённый поток завладел tx. Созданный поток должен владеть передающей частью канала, чтобы иметь возможность отправлять сообщения через канал.

Передающая часть имеет метод send, который принимает отправляемое значение. Метод send возвращает тип Result<T, E>, поэтому если принимающая часть закрылась и некуда отправить значение, то операция send выдаст ошибку. В этом примере мы вызываем функцию unwrap для паники в случае ошибки. Но в реальном приложении мы бы обработали её правильно: вернитесь в главу 9, чтобы просмотреть стратегии правильной обработки ошибок.

В листинге 16-8 мы получим значение на принимающей части канала в основном потоке. Это похоже на извлечение резиновой утки из воды в конце реки или на получение сообщения в чате.

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

Листинг 16-8: Получение значения "hi" в главном потоке и его печать

Принимающая часть канала имеет два полезных метода: recv и try_recv. Мы используем recv, сокращение от receive, которое блокирует выполнение основного потока и ждёт, пока значение не будет передано по каналу. Как только значение отправлено, recv вернёт его внутри Result<T, E>. Когда передающая часть канала закрывается, вызов recv вернёт ошибку, сообщив что больше значений не будет.

Метод try_recv не блокирующий, отличается тем, что немедленно вернёт тип Result<T, E>: где значение Ok содержащее сообщение, если оно доступно и значение Err, если в этот раз нет сообщений. Использование try_recv полезно, если текущий поток выполняет другую работу во время ожидания сообщений: мы могли бы написать цикл довольно часто вызывающий метод try_recv, так чтобы обрабатывать сообщение, если оно доступно и в противном случае выполнять другую работу некоторое время до следующей проверки.

Мы использовали recv в этом примере для простоты; у нас нет никакой другой работы для основного потока, кроме как ждать сообщений, поэтому блокировка основного потока уместна.

При запуске кода листинга 16-8, мы увидим значение, напечатанное из основного потока:

Got: hi

Отлично!

Каналы и передача владения

Правила владения играют жизненно важную роль в отправке сообщений, потому что они помогают писать безопасный многопоточный код. Предотвращение ошибок в многопоточном программировании является преимуществом для размышлений о владении во всех ваших Rust программах. Давайте проведём эксперимент, чтобы показать как каналы и владение действуют совместно для предотвращения проблем: мы попытаемся использовать значение val в порождённом потоке после того как отправим его в канал. Попробуйте скомпилировать код в листинге 16-9, чтобы понять, почему этот код не разрешён:

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

Листинг 16-9: Попытка использовать val после того, как мы отправили его в канал

Здесь мы пытаемся напечатать значение val после того, как отправили его в канал вызвав tx.send. Разрешить это было бы плохой идеей: после того, как значение было отправлено в другой поток, текущий поток мог бы изменить или удалить значение, прежде чем мы попытались бы использовать значение снова. Потенциально изменения в другом потоке могут привести к ошибкам или не ожидаемым результатам из-за противоречивых или несуществующих данных. Однако Rust выдаёт нам ошибку, если мы пытаемся скомпилировать код в листинге 16-9:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value borrowed here after move

error: aborting due to previous error

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing`

To learn more, run the command again with --verbose.

Наша ошибка для многопоточности привела к ошибке компиляции. Функция send вступает во владение своим параметром и когда значение перемещается, получатель становится владельцем этого параметра. Это останавливает нас от случайного использования значения снова после его отправки; анализатор заимствования проверяет, что все в порядке.

Отправка нескольких значений и ожидание получателем

Код в листинге 16-8 компилируется и выполняется, но в нем не ясно показано то, что два отдельных потока общаются друг с другом через канал. В листинге 16-10 мы внесли некоторые изменения, которые докажут, что код в листинге 16-8 работает одновременно: порождённый поток теперь будет отправлять несколько сообщений и делать паузу на секунду между каждым сообщением.

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

Листинг 16-10: Отправка нескольких сообщений и пауза между ними

На этот раз порождённый поток имеет вектор строк, которые мы хотим отправить основному потоку. Мы перебираем их, отправляя каждую сроку по отдельности и делаем паузу между ними, вызывая функцию thread::sleep со значением Duration равным 1 секунде.

В основном потоке мы больше не вызываем функцию recv явно: вместо этого мы используем rx как итератор. Для каждого полученного значения мы печатаем его. Когда канал будет закрыт, итерация закончится.

При выполнении кода в листинге 16-10 вы должны увидеть следующий вывод с паузой в 1 секунду между каждой строкой:

Got: hi
Got: from
Got: the
Got: thread

Поскольку у нас нет кода, который приостанавливает или задерживает цикл for в основном потоке, мы можем сказать, что основной поток ожидает получения значений из порождённого потока.

Создание нескольких отправителей путём клонирования передатчика

Ранее мы упоминали, что mpsc аббревиатура несколько производителей, один потребитель. Давайте подключим mpsc и расширим код в листинге 16-10 так, чтобы создать несколько потоков, которые все отправляют значения одному и тому же получателю. Мы можем сделать это путём клонирования передающей части канала, как показано в листинге 16-11:

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }

    // --snip--
}

Листинг 16-11. Отправка нескольких сообщений от нескольких отправителей

На этот раз, прежде чем мы создадим первый порождённый поток, мы вызываем clone на передающей части канала. Это даст нам новый дескриптор отправки, который мы можем передать первому порождённому потоку. Мы передаём исходную отправляющую часть канала второму порождённому потоку. Это даёт нам два потока, каждый из которых отправляет разные сообщения принимающей части канала.

Когда вы запустите код, вывод должен выглядеть примерно так:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Вы можете увидеть эти значения в другом порядке; все зависит от вашей системы. Это то, что делает многопоточный параллелизм интересным и трудным. Если вы экспериментируете с thread::sleep задавая различные значения в разных потоках, то каждый прогон будет более не определённым и каждый раз будет создавать разные выходные данные.

Теперь, когда мы посмотрели, как работают каналы, давайте рассмотрим другой метод многопоточности.