Тема 33. Работа с потоками ForkJoinPool - BelyiZ/JavaCourses GitHub Wiki

Содержание:

  1. Принцип работы ForkJoinPool
  2. Модель ForkJoin
  3. Внутреннее устройство ForkJoinPool
    1. Алгоритм "Кражи работы"
  4. Создание экземпляра ForkJoinPool
  5. ForkJoinTask
    1. RecursiveAction
    2. RecursiveTask
  6. Отправка задач в ForkJoinPool
  7. Рекомендации в случае использования ForkJoinPool
  8. Случаи использования ForkJoinPool
  9. Список литературы/курсов

ForkJoinPool - Это детализированный фреймворк для эффективного распараллеливания выполнения задач.

Принцип работы ForkJoinPool

Дизайн ForkJoinPool основан на алгоритме "Разделяй и властвуй". Это парадигма разработки алгоритмов, заключающаяся в рекурсивном разбиении решаемой задачи на две или более подзадачи того же типа, но меньшего размера, и комбинировании их решений для получения ответа к исходной задаче; разбиения выполняются до тех пор, пока все подзадачи не окажутся элементарными.

То есть каждая задача разбивается на подзадачи по максимуму, затем они выполняются параллельно, и как только все из них завершаются, происходит объединение результатов.

Фреймворк сначала “разветвляется” , рекурсивно разбивая задачу на более мелкие независимые подзадачи, пока они не станут достаточно простыми для асинхронного выполнения.

После этого начинается часть “соединение” , в которой результаты всех подзадач рекурсивно объединяются в один результат, или в случае задачи, которая возвращает void, программа просто ждет, пока не будет выполнена каждая подзадача.

Для обеспечения эффективного параллельного выполнения платформа fork/join использует пул потоков, называемый ForkJoinPool, который управляет рабочими потоками типа ForkJoinWorkerThread.

Преимущество:

За счет выполнения задач параллельным образом повышается производительность. Но данный эффект достигается только с большими задачами (когда необходимо обработать большие массивы данных)

Модель ForkJoin

Модель fork-join это метод, в котором мы разделяем каждую задачу fork, а затем ждем объединения join всех получившихся подзадач и получаем результат.

При этом каждая задача разделяется всякий раз, когда вызывается fork. Точно так же, когда все задачи завершаются, они объединяются посредством join для получения конечного результата.

Внутреннее устройство ForkJoinPool

ForkJoinPool, как и любой пул потоков, состоит из заранее заданного числа потоков или рабочих групп. Это реализация ExecutorService, которая управляет рабочими потоками и предоставляет нам инструменты для получения информации о состоянии и производительности пула потоков. Рабочие потоки могут выполнять только одну задачу за раз, но ForkJoinPool не создает отдельный поток для каждой отдельной подзадачи.

Когда создается новый ForkJoinPool, уровень параллелизма по умолчанию (количество потоков) будет равен количеству доступных процессоров в системе. Их число возвращается методом Runtime.availableProcessors().

Обратим внимание: в настоящее время, когда многократно задействована виртуализация (облачные виртуальные машины и Docker), у JVM зачастую не будет столько доступных процессоров, сколько их есть на базовой машине.

Возможно создать свой собственный ForkJoinPool, указав, сколько потоков необходимо. Обратим внимание: пул с числом потоков, превосходящим количество доступных процессоров, для задач с интенсивным использованием процессора полезен не будет. Однако, если стоят задачи интенсивного ввода-вывода (то есть им придется часто ждать завершения операций ввода-вывода), большой пул может пригодиться.

Каждый рабочий поток имеет собственную двухстороннюю рабочую очередь deques типа WorkQueue, в которой хранятся задачи.

Таким образом, каждый из рабочих потоков продолжает сканировать доступные для выполнения подзадачи. Основная цель в том, чтобы как можно больше нагружать рабочие потоки и максимизировать использование ядер процессора. Поток блокируется только тогда, когда нет доступных для выполнения подзадач.

В случае если рабочий поток не может найти задачи для запуска в своей собственной очереди он будет пытаться “украсть” задачи у тех процессоров, которые загружены сильнее.

##Алгоритм "Кражи работы"

Чтобы свести к минимуму конкуренцию за задачу в одно и то же время между владельцем очереди и “похитителем” и исключить вариант, что они будут мешать друг другу, как владелец очереди, так и “похитители” захватывают задачи из разных частей очереди.

Для вставки задач в очередь используется метод push(), а владелец очереди захватывает задачу, вызывая метод pop(). Таким образом, владельцем очереди сама очередь используется в качестве стека, и элементы извлекаются из его верхней части.

Здесь представлен метод LIFO Last In, First Out (“последним вошел первым вышел”).

Основная причина использования LIFO - повышение производительности. Всегда, выбирая самую последнюю задачу, мы увеличиваем шансы на то, что ресурсы задачи все еще будут распределены в кэшах процессора, а это значительно повысит производительность. Это называют локальностью ссылок.

С другой стороны, когда у рабочего потока заканчиваются задачи, он всегда будет забирать задачи из хвоста очереди другого воркера, вызывая метод poll().

В этом случае мы уже следуем подходу FIFO First In, First Out (“первым вошел первым вышел”). Это в основном служит для уменьшения конкуренции, необходимой для синхронизации как владельцу очереди, так и “похитителю”.

Еще одна причина в том, что из-за природы самих делимых задач более старые задачи в очереди, скорее всего, будут больше по объему, поскольку еще не подверглись разделению.

Методы push и pop вызываются только владельцем очереди, а метод poll вызывается только процессором, пытающимся “украсть” работу у другого процессора.

Методы push и pop это операции CAS (Compare-and-Swap, сравнение с обменом) без ожидания, так что они весьма эффективны. Однако метод poll не всегда свободен от блокировки. Он блокируется в тех случаях, когда очередь почти пуста, поскольку требуется некоторая синхронизация для гарантии, что только владелец или похититель выберет данную задачу, но не оба сразу.

Создание экземпляра ForkJoinPool

В Java 8 наиболее удобным способом получить доступ к экземпляру ForkJoinPool является использование его статического метода commonPool (). Как следует из названия, это обеспечит ссылку на общий пул, который является пулом потоков по умолчанию для каждой ForkJoinTask.

Согласно документации Oracle , использование предопределенного общего пула снижает потребление ресурсов, поскольку это препятствует созданию отдельного пула потоков для каждой задачи.

ForkJoinPool commonPool = ForkJoinPool.commonPool();

Получить к нему доступ возможно следующим образом:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

ForkJoinTask

ForkJoinTask является базовым типом для задач, выполняемых внутри ForkJoinPool.

Благодаря фреймворку ForkJoinPool параллельные потоки Java могут работать очень эффективно, оптимизируя использование доступных ядер. Но в случае если для обработки нужного типа задачи неприменимы потоки Java возможно создавать свои собственные “делимые” задачи, просто расширяя класс ForkJoinTask.

ForkJoinTask - это Java-класс, который ведет себя аналогично потоку Java, но он гораздо более легкий, в основном потому, что ему не приходится поддерживать свой собственный стек времени выполнения или счетчики программ.

Существует три подтипа ForkJoinTask: RecursiveAction, RecursiveTask и CountedCompleter. Выбор того или иного подтипа будет зависеть от типа задач, которые вы пишете. Чтобы понять, какой из них лучше всего соответствует вашим потребностям изучите документацию, которая приведена в списке литературы.

На практике расширяют один из двух подклассов ForkJoinTask: RecursiveAction - для void задач, RecursiveTask - для задач, возвращающих значение. У них обоих есть абстрактный метод compute(), в котором определена логика задачи.

Отметим, что лямбда-выражениями пользоваться при работе с ForkJoinTask невозможно.

Обратим внимание, что ForkJoinPool позволяет отправлять не только ForkJoinTasks, но также вызываемые (Callable) или выполняемые (Runnable) задачи, поэтому возможно применять ForkJoinPool таким же образом, как и другие существующие исполнители. Единственное отличие в том, что задача не будет разделяться сама по себе, но возможно извлечь выгоду из повышения производительности "кражи работы", если будет отправлено несколько задач и у некоторых потоков будет меньше загрузка, чем у других.

RecursiveAction

В приведенном ниже примере единица работы, подлежащая обработке, представлена строкой, называемой рабочая нагрузка. Программа вводит данные в верхний регистр и регистрирует их.

Чтобы продемонстрировать поведение разветвления фреймворка, пример разбивает задачу, если рабочая нагрузка .length() превышает заданный порог с помощью метода createSubtask() .

Строка рекурсивно разделяется на подстроки, создавая Пользовательские экземпляры RecursiveTask , основанные на этих подстроках.

В результате метод возвращает List RecursiveAction.

Список передается в ForkJoinPool с помощью метода invokeAll() :


public class CustomRecursiveAction extends RecursiveAction {
    private String workload = "";
    private static final int THRESHOLD = 4;
    private static Logger logger =
            Logger.getAnonymousLogger();
    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }
    @Override
    protected void compute() {
        if (workload.length() > THRESHOLD) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else {
            processing(workload);
        }
}
private List createSubtasks() {
        List subtasks = new ArrayList<>();
        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());
        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));
        return subtasks;
}
private void processing(String work) {
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by "
                + Thread.currentThread().getName());
    }
}

Этот шаблон можно использовать для разработки собственных RecursiveAction классов. Для необходимо создать объект, представляющий общий объем работы, выбрать подходящий порог, определить метод разделения работы и определить метод выполнения работы.

RecursiveTask

Для задач, возвращающих значение, логика здесь аналогична, за исключением того, что результат для каждой подзадачи объединяется в один результат:

public class CustomRecursiveTask extends RecursiveTask {
    private int[] arr;
    private static final int THRESHOLD = 20;
    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }
    @Override
    protected Integer compute() {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
                    .stream()
                    .mapToInt(ForkJoinTask::join)
                    .sum();
        } else {
            return processing(arr);
        }
    }
private Collection createSubtasks() {
        List dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
                Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }
private Integer processing(int[] arr) {
        return Arrays.stream(arr)
                .filter(a -> a > 10 && a < 27)
                .map(a -> a * 10)
                .sum();
    }
}

В этом примере работа представлена массивом, хранящимся в поле art класса Custom RecursiveTask. Метод create Subtasks() рекурсивно делит задачу на более мелкие части работы, пока каждая часть не станет меньше порогового значения. Затем метод invokeAll() отправляет подзадачи в общий пул и возвращает список Future .

Для запуска выполнения для каждой подзадачи вызывается метод join (). Метод sum() используется в качестве представления объединения вложенных результатов в конечный результат.

Отправка задач в ForkJoinPool

Для отправки задач в пул потоков можно использовать несколько подходов.

Метод submit() или execute () (их варианты использования одинаковы):

forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

Метод invoke() разветвляет задачу и ожидает результата, и не требует ручного соединения:

int result = forkJoinPool.invoke(customRecursiveTask);

Метод invokeAll() является наиболее удобным способом отправки последовательности ForkJoinTasks в ForkJoinPool. Он принимает задачи в качестве параметров (две задачи, varargs или коллекция), а затем возвращает коллекцию объектов Feature в том порядке, в котором они были созданы.

Кроме того, возможно использовать отдельные методы fork() и join(). Метод fork() отправляет задачу в пул, но не запускает ее выполнение. Для этой цели необходимо использовать метод join(). В случае RecursiveAction функция join() возвращает только null ; для RecursiveTask возвращает результат выполнения задачи:

customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();

В нашем примере RecursiveTask мы использовали метод invokeAll() для отправки последовательности подзадач в пул. Ту же работу можно выполнить с помощью fork() и join() , что имеет последствия для упорядочения результатов.

Чтобы избежать путаницы, обычно рекомендуется использовать метод invokeAll() для отправки более одной задачи в ForkJoinPool.

Рекомендации в случае использования ForkJoinPool

  1. Используйте как можно меньше пулов потоков – в большинстве случаев лучшим решением является использование одного пула потоков для каждого приложения или системы.
  2. Используйте пул общих потоков по умолчанию, если конкретная настройка не требуется.
  3. Используйте разумный порог для разделения ForkJoinTask на подзадачи.
  4. Избегайте каких-либо блокировок в вашей Forkjointask.

Случаи использования ForkJoinPool

Использование фреймворка fork/join может ускорить обработку больших задач, в случае использования фреймворка в небольших задачах производительность не увеличится.

Список литературы/курсов

  1. https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html
  2. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinTask.html - описание методов ForkJoinTask
  3. https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/RecursiveAction.html
  4. https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/RecursiveTask.html
  5. https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CountedCompleter.html

Тема 32. Конкуренция | Оглавление | Тема 34. Введение в Spring