Пример: map-reduce

Rust позволяет очень легко распределить обработку данных между потоками, без головной боли, традиционно связанной с попыткой сделать это.

Стандартная библиотека предоставляет отличные примитивы для работы потоками из коробки. Они в сочетании с концепцией владения и правилами алиасинга в Rust, автоматически предотвращают гонки данных.

Правила алиасинга (одна уникальная ссылка на запись или много ссылок на чтение) автоматически не позволяет вам манипулировать состоянием, которое видно другим потокам. (Где синхронизация необходима, есть примитивы синхронизации, такие как mutex (мьютексы) или channel (каналы).)

В этом примере мы вычислим сумму всех цифр в блоке чисел. Мы сделаем это, разбив куски блока на разные потоки. Каждый поток будет суммировать свой крошечный блок цифр, и впоследствии мы будем суммировать промежуточные суммы, полученные каждым потоком.

Обратите внимание на то, что хоть мы и передаём ссылки через границы потоков, Rust понимает, что мы только передаём неизменяемые ссылки, которые можно только читать, и что из-за этого не может быть никакой небезопасности и гонок данных. Так как мы перемещаем (move) сегменты данных в поток, Rust также уверен, что данные будут жить до тех пор, пока поток не завершится, и висящих указателей не появится.

use std::thread;

// Это главный поток
fn main() {

    // Это данные, которые мы будем обрабатывать.
    // Мы посчитаем сумму всех чисел при помощи разделённого на потоки map-reduce алгоритма.
    // Каждый фрагмент, разделённый пробелами, будет обрабатываться в отдельном потоке.
    //
    // TODO: посмотрите, что случится, если вы добавите пробелов!
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";

    // Создадим вектор, который будет содержать созданные нам дочерние потоки.
    let mut children = vec![];

    /*************************************************************************
     * "Map" фаза
     *
     * Разделим наши данные на сегменты и запустим начальную обработку
     ************************************************************************/

    // Разделим наши данные на сегменты для индивидуального вычисления.
    // Каждый фрагмент будет ссылкой (&str) на данные.
    let chunked_data = data.split_whitespace();

    // Обойдём сегменты данных.
    // .enumerate() добавит в текущий цикл индекс элемента
    // и далее полученный кортеж "(index, element)" будет немедленно
    // "деструктурирован" на две переменные, "i" и "data_segment", при помощи
    // "деструктурирующего присваивания"
    for (i, data_segment) in chunked_data.enumerate() {
        println!("{} сегмент данных \"{}\"", i, data_segment);

        // Обработаем каждый сегмент данных в отдельном потоке
        //
        // `spawn()` вернёт ручку на новый поток,
        // которую мы ДОЛЖНЫ сохранить, чтобы иметь доступ к возвращённому значению
        //
        // Синтаксис 'move || -> u32' обозначает замыкание, которое:
        // * не имеет аргументов ('||')
        // * забирает владение захваченных переменных ('move')
        // * возвращает беззнаковое 32-битное целое число ('-> u32')
        //
        // Rust может вывести '-> u32' из самого замыкация,
        // так что мы можем его опустить.
        //
        // TODO: попробуйте удалить 'move' и посмотреть что получится
        children.push(thread::spawn(move || -> u32 {
            // Вычислим промежуточную сумму этого сегмента:
            let result = data_segment
                        // итерируемся по символам этого сегмента..
                        .chars()
                        // .. преобразуем текстовые символы в их числовые значения..
                        .map(|c| c.to_digit(10).expect("должно быть числом"))
                        // .. и суммируем получившийся итератор из чисел
                        .sum();

            // `println!` блокирует стандартный вывод, так что чередования текста не происходит
            println!("обработан сегмент {}, result={}", i, result);

            // "return" не обязателен, так как Rust "язык выражений" и
            // последнее выполненное выращение в каждом блоке автоматически становится значением этого блока.
            result

        }));
    }


    /*************************************************************************
     * Фаза "Reduce"
     *
     * Собираем наши промежуточные значения и объединяем их в конечные результат
     ************************************************************************/

    // собираем промежуточный результат каждого потока в новый вектор
    let mut intermediate_sums = vec![];
    for child in children {
        // собираем возвращаемое значение каждого дочернего потока
        let intermediate_sum = child.join().unwrap();
        intermediate_sums.push(intermediate_sum);
    }

    // Объединяем все промежуточные суммы в одну конечную сумму.
    //
    // Мы используем "turbofish" `::<>` чтобы подсказать `sum()` тип.
    //
    // TODO: попробуйте без turbofish, явно указывая тип final_result
    let final_result = intermediate_sums.iter().sum::<u32>();

    println!("Финальная сумма: {}", final_result);
}

Назначения

Не стоит позволять числу наших потоков быть зависимом от введённых пользователем данных. Что если пользователь решит вставить много пробелов? Мы действительно хотим создать 2000 потоков? Измените программу так, чтобы данные разбивались на ограниченное число блоков, объявленных статической константой в начале программы.

Смотрите также: