Потоки
Запуск короткоживущего потока
Этот пример использует крейт crossbeam, который содержит структуры данных и функции для параллельного и многопоточного программирования. Scope::spawn
порождает новый поток со своей областью видимости, который гарантированно прекращается после возврата из области замыкания, переданной в функцию crossbeam::scope
. Это означает, что можем безопасно ссылаться на данные из вызванной функции.
Пример делит массив пополам и выполняет работу над его половинами в новых потоках.
extern crate crossbeam; fn main() { let arr = &[1, 25, -4, 10]; let max = find_max(arr); assert_eq!(max, Some(25)); } fn find_max(arr: &[i32]) -> Option<i32> { const THRESHOLD: usize = 2; if arr.len() <= THRESHOLD { return arr.iter().cloned().max(); } let mid = arr.len() / 2; let (left, right) = arr.split_at(mid); crossbeam::scope(|s| { let thread_l = s.spawn(|_| find_max(left)); let thread_r = s.spawn(|_| find_max(right)); let max_l = thread_l.join().unwrap()?; let max_r = thread_r.join().unwrap()?; Some(max_l.max(max_r)) }).unwrap() }
Передача данных между двумя потоками
Этот пример демонстрирует использование crossbeam-channel с одиночным производителем и одиночным потребителем (single producer - single consumer, SPSC).
Мы далее развиваем ex-crossbeam-spawn пример, используя crossbeam::scope
и Scope::spawn
для управления производящим потоком.
Данные передаются между двумя потоками через crossbeam_channel::unbounded
канал, то есть в канале может храниться неограниченное количество сообщений (которое фактически ограничено доступным объёмом памяти). Поток-производитель засыпает на полсекунды между посылкой сообщений.
extern crate crossbeam; extern crate crossbeam_channel; use std::{thread, time}; use crossbeam_channel::unbounded; fn main() { let (snd, rcv) = unbounded(); let n_msgs = 5; crossbeam::scope(|s| { s.spawn(|_| { for i in 0..n_msgs { snd.send(i).unwrap(); thread::sleep(time::Duration::from_millis(100)); } }); }).unwrap(); for _ in 0..n_msgs { let msg = rcv.recv().unwrap(); println!("Received {}", msg); } }
Поддержание изменяемого глобального состояния
В примере объявляется глобальное состояние используя lazy_static. lazy_static создаёт объект, доступный глобально через ссылку static ref
, которая требует Mutex
, чтобы сделать этот объект изменяемым (можно также использовать RwLock
).
Обёртка Mutex
гарантирует, что к этому состоянию не может быть одновременно получен доступ из множества потоков, таким образом предотвращая гонки.
Чтобы прочитать или записать в объект, защищённым Mutex
-ом, нужно получить специальный объект типа MutexGuard
, и осуществлять доступ через него.
#[macro_use] extern crate error_chain; #[macro_use] extern crate lazy_static; use std::sync::Mutex; error_chain!{ } lazy_static! { static ref FRUIT: Mutex<Vec<String>> = Mutex::new(Vec::new()); } fn insert(fruit: &str) -> Result<()> { let mut db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?; db.push(fruit.to_string()); Ok(()) } fn main() -> Result<()> { insert("apple")?; insert("orange")?; insert("peach")?; { let db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?; db.iter().enumerate().for_each(|(i, item)| println!("{}: {}", i, item)); } insert("grape")?; Ok(()) }
Конкурентный расчёт хеш-суммы SHA256 iso файлов
Этот пример рассчитывает SHA256 для всех файлов с расширением iso в текущей директории. Пул потоков генерирует потоки в количестве, равному количеству ядер в системе найденные с помощью num_cpus::get
. Walkdir::new
итерируется по директории и вызывает execute
чтобы совершить операцию чтения файла и вычисления SHA1 хэш-суммы.
extern crate walkdir; extern crate ring; extern crate num_cpus; extern crate threadpool; use walkdir::WalkDir; use std::fs::File; use std::io::{BufReader, Read, Error}; use std::path::Path; use threadpool::ThreadPool; use std::sync::mpsc::channel; use ring::digest::{Context, Digest, SHA256}; // Проверка iso расширения fn is_iso(entry: &Path) -> bool { match entry.extension() { Some(e) if e.to_string_lossy().to_lowercase() == "iso" => true, _ => false, } } fn compute_digest<P: AsRef<Path>>(filepath: P) -> Result<(Digest, P), Error> { let mut buf_reader = BufReader::new(File::open(&filepath)?); let mut context = Context::new(&SHA256); let mut buffer = [0; 1024]; loop { let count = buf_reader.read(&mut buffer)?; if count == 0 { break; } context.update(&buffer[..count]); } Ok((context.finish(), filepath)) } fn main() -> Result<(), Error> { let pool = ThreadPool::new(num_cpus::get()); let (tx, rx) = channel(); for entry in WalkDir::new("/home/user/Downloads") .follow_links(true) .into_iter() .filter_map(|e| e.ok()) .filter(|e| !e.path().is_dir() && is_iso(e.path())) { let path = entry.path().to_owned(); let tx = tx.clone(); pool.execute(move || { let digest = compute_digest(path); tx.send(digest).expect("Could not send data!"); }); } drop(tx); for t in rx.iter() { let (sha, path) = t?; println!("{:?} {:?}", sha, path); } Ok(()) }
{{#include thread/threadpool-fractal.md}}