Потоки

Запуск короткоживущего потока

crossbeam-badge cat-concurrency-badge

Этот пример использует крейт crossbeam, который содержит структуры данных и функции для параллельного и многопоточного программирования. Scope::spawn порождает новый поток со своей областью видимости, который гарантированно прекращается после возврата из области замыкания, переданной в функцию crossbeam::scope. Это означает, что можем безопасно ссылаться на данные из вызванной функции.

Пример делит массив пополам и выполняет работу над его половинами в новых потоках.

extern crate crossbeam;

fn main() {
    let arr = &[1, 25, -4, 10];
    let max = find_max(arr);
    assert_eq!(max, Some(25));
}

fn find_max(arr: &[i32]) -> Option<i32> {
    const THRESHOLD: usize = 2;
  
    if arr.len() <= THRESHOLD {
        return arr.iter().cloned().max();
    }

    let mid = arr.len() / 2;
    let (left, right) = arr.split_at(mid);
  
    crossbeam::scope(|s| {
        let thread_l = s.spawn(|_| find_max(left));
        let thread_r = s.spawn(|_| find_max(right));
  
        let max_l = thread_l.join().unwrap()?;
        let max_r = thread_r.join().unwrap()?;
  
        Some(max_l.max(max_r))
    }).unwrap()
}

Передача данных между двумя потоками

crossbeam-badge cat-concurrency-badge

Этот пример демонстрирует использование crossbeam-channel с одиночным производителем и одиночным потребителем (single producer - single consumer, SPSC). Мы далее развиваем ex-crossbeam-spawn пример, используя crossbeam::scope и Scope::spawn для управления производящим потоком. Данные передаются между двумя потоками через crossbeam_channel::unbounded канал, то есть в канале может храниться неограниченное количество сообщений (которое фактически ограничено доступным объёмом памяти). Поток-производитель засыпает на полсекунды между посылкой сообщений.

extern crate crossbeam;
extern crate crossbeam_channel;

use std::{thread, time};
use crossbeam_channel::unbounded;

fn main() {
    let (snd, rcv) = unbounded();
    let n_msgs = 5;
    crossbeam::scope(|s| {
        s.spawn(|_| {
            for i in 0..n_msgs {
                snd.send(i).unwrap();
                thread::sleep(time::Duration::from_millis(100));
            }
        });
    }).unwrap();
    for _ in 0..n_msgs {
        let msg = rcv.recv().unwrap();
        println!("Received {}", msg);
    }
}

Поддержание изменяемого глобального состояния

lazy_static-badge cat-rust-patterns-badge

В примере объявляется глобальное состояние используя lazy_static. lazy_static создаёт объект, доступный глобально через ссылку static ref, которая требует Mutex, чтобы сделать этот объект изменяемым (можно также использовать RwLock). Обёртка Mutex гарантирует, что к этому состоянию не может быть одновременно получен доступ из множества потоков, таким образом предотвращая гонки. Чтобы прочитать или записать в объект, защищённым Mutex-ом, нужно получить специальный объект типа MutexGuard, и осуществлять доступ через него.

#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate lazy_static;

use std::sync::Mutex;

error_chain!{ }

lazy_static! {
    static ref FRUIT: Mutex<Vec<String>> = Mutex::new(Vec::new());
}

fn insert(fruit: &str) -> Result<()> {
    let mut db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?;
    db.push(fruit.to_string());
    Ok(())
}

fn main() -> Result<()> {
    insert("apple")?;
    insert("orange")?;
    insert("peach")?;
    {
        let db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?;

        db.iter().enumerate().for_each(|(i, item)| println!("{}: {}", i, item));
    }
    insert("grape")?;
    Ok(())
}

Конкурентный расчёт хеш-суммы SHA256 iso файлов

threadpool-badge num_cpus-badge walkdir-badge ring-badge cat-concurrency-badgecat-filesystem-badge

Этот пример рассчитывает SHA256 для всех файлов с расширением iso в текущей директории. Пул потоков генерирует потоки в количестве, равному количеству ядер в системе найденные с помощью num_cpus::get. Walkdir::new итерируется по директории и вызывает execute чтобы совершить операцию чтения файла и вычисления SHA1 хэш-суммы.

extern crate walkdir;
extern crate ring;
extern crate num_cpus;
extern crate threadpool;

use walkdir::WalkDir;
use std::fs::File;
use std::io::{BufReader, Read, Error};
use std::path::Path;
use threadpool::ThreadPool;
use std::sync::mpsc::channel;
use ring::digest::{Context, Digest, SHA256};

// Проверка iso расширения
fn is_iso(entry: &Path) -> bool {
    match entry.extension() {
        Some(e) if e.to_string_lossy().to_lowercase() == "iso" => true,
        _ => false,
    }
}

fn compute_digest<P: AsRef<Path>>(filepath: P) -> Result<(Digest, P), Error> {
    let mut buf_reader = BufReader::new(File::open(&filepath)?);
    let mut context = Context::new(&SHA256);
    let mut buffer = [0; 1024];

    loop {
        let count = buf_reader.read(&mut buffer)?;
        if count == 0 {
            break;
        }
        context.update(&buffer[..count]);
    }

    Ok((context.finish(), filepath))
}

fn main() -> Result<(), Error> {
    let pool = ThreadPool::new(num_cpus::get());

    let (tx, rx) = channel();

    for entry in WalkDir::new("/home/user/Downloads")
        .follow_links(true)
        .into_iter()
        .filter_map(|e| e.ok())
        .filter(|e| !e.path().is_dir() && is_iso(e.path())) {
            let path = entry.path().to_owned();
            let tx = tx.clone();
            pool.execute(move || {
                let digest = compute_digest(path);
                tx.send(digest).expect("Could not send data!");
            });
        }

    drop(tx);
    for t in rx.iter() {
        let (sha, path) = t?;
        println!("{:?} {:?}", sha, path);
    }
    Ok(())
}

{{#include thread/threadpool-fractal.md}}