Многопоточность - DmitryGontarenko/usefultricks GitHub Wiki
Существует два вида многозадачности:
Многозадачность на основе процессов - это средство, позволяющее одновременно выполнять две или несколько программ на компьютере.
Многозадачность на основе потоков - позволяет программе выполнять несколько задач одновременно.
Многопоточная система в Java построена на основе класса Thread
, его методах и дополняющем его интерфейсе Runnable
.
Что бы создать новый поток исполнения, следует расширить класс Thread или же реализовать интерфейс Runnable. Класс Thread содержит ряд методов, помогающих управлять потоками:
getName()
- получить имя потока
getPriority()
- получить приоритет потока
isAlive()
- определяет, выполнятся ли поток
join()
- ожидает завершения потока
run()
- задает точку входа в поток исполнения
sleep()
- приостанавливает выполнение потока на заданное время
start()
- запускает поток исполнения, вызывая его метод run()
При запуске Java программы сразу же начинает выполняться один, главный поток. От этого потока порождаются все дочерние потоки. Зачастую он должен быть последним потоков, завершающим выполнение программы, поскольку в нем производятся различные завершающие действия. Несмотря на то, что главный поток создается автоматически при запуске программы, им можно управлять через объект класса Thread
, вызвав для этого метода currentThread()
. Этот метод возвращает ссылку на тот поток, из которого он был вызван. Получив ссылку на поток, можно управлять им таким же образом, как и любым другим потоком.
public static void main(String[] args) {
Thread thread = Thread.currentThread();
System.out.println("Текущий поток: " + thread);
thread.setName("My Thread");
System.out.println("Новое имя: " + thread);
// Output:
// Текущий поток: Thread[main,5,main]
// Новое имя: Thread[My Thread,5,main]
Если выводить экземпляр объекта Thread
на консоль, то можно увидеть: имя потока, его приоритет и имя группы.
Поток исполнения можно создать из любого объекта любого класса, реализующего интерфейс Runnable. Для реализации этого интерфейса должен должен быть переопределен метод run()
.
В теле этого метода определяется код, который и будет составлять новый поток исполнения. В методе run()
устанавливается точка входа в другой, параллельный главному, поток.
После создания класса, реализующего интерфейс Runnable, следует получить экземпляр объекта типа Thread. Для этого в классе Thread определен ряд конструкторов: public Thread(Runnable target, String name);
Runnable target
- обозначает экземпляр класса, реализующего интерфейс Runnable
String name
- имя нового потока
Созданный поток не запустится до тех пор, пока не будет вызван метод start()
класса Thread
. По сути в методе start()
вызывается метод run()
.
public class ThreadDemo {
public static void main(String[] args) {
new NewThread(); // создать новый поток
for (int i = 0; i < 5; i++) {
System.out.println("Главный поток: " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Главный поток завершен");
}
}
class NewThread implements Runnable {
public NewThread() {
Thread thread = new Thread(this, "Демонстрационный поток");
System.out.println("Дочерний поток создан:" + thread);
thread.start();
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println("Дочерний поток: " + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Дочерний поток завершен");
}
}
// Output:
// Дочерний поток создан:Thread[Демонстрационный поток,5,main]
// Главный поток: 0
// Дочерний поток: 0
// Дочерний поток: 1
// Дочерний поток: 2
// Главный поток: 1
// Дочерний поток: 3
// Главный поток: 2
// Дочерний поток: 4
// Дочерний поток завершен
// Главный поток: 3
// Главный поток: 4
// Главный поток завершен
При вызове метода start()
, запускается метод run()
, что в свою очередь приводит к началу цикла for()
в дочернем потоке. После вызова start()
конструктор NewThread()
возвращает управление методу main()
. Возобновляя свое исполнение, главный поток заходит в свой цикл for()
. Далее потоки выполняются параллельно, вплоть до завершения своих циклов.
Еще один способ создать поток - объявить класс, расширяющий класс Thread
, а затем получить экземпляр этого класса.
В расширяющем классе, как и в предыдущем примере, должен быть определен метод run()
и вызван метод start()
. Если бы для примера выше, мы решили не реализовывать интерфейс Runnable
, а расширить класс Thread
, то код бы изменился таким образом:
class NewThread extends Thread {
public NewThread() {
super("Демонстрационный поток");
System.out.println("Дочерний поток создан:" + this);
start();
}
В классе Thread
определяется ряд методов, которые могут быть переопределены. И только метод run()
должен быть обязательно переопределен, но этот метод требуется и в том случае, если мы реализуем интерфейс Runnable
. Следовательно, если ни один из других методов не переопределяется, то лучше использовать интерфейс Runnable
. Кроме того, в Java нет множественного наследования и расширения класса Thread
накладывает определенный ограничения, в отличии от реализации интерфейсов - их можно реализовать сколько угодно.
Метод isAlive()
позволяет определить, был ли поток завершен и возвращает логическое значение true или false.
Метод join()
применяется чтобы дождаться завершения потока исполнения. Этот метод ожидает завершения того потока исполнения, для которого он был вызван.
public class DemoJoin {
public static void main(String[] args) {
NewThread thread1 = new NewThread("One");
NewThread thread2 = new NewThread("Two");
NewThread thread3 = new NewThread("Three");
System.out.println("Thread One started: " + thread1.thread.isAlive()); // true
System.out.println("Thread Two started: " + thread2.thread.isAlive()); // true
System.out.println("Thread Three started: " + thread3.thread.isAlive()); // true
try {
thread1.thread.join();
thread2.thread.join();
thread3.thread.join();
} catch (InterruptedException e) {
System.out.println("Main thread was interrupted");
}
System.out.println("Thread One started: " + thread1.thread.isAlive()); // false
System.out.println("Thread Two started: " + thread2.thread.isAlive()); // false
System.out.println("Thread Three started: " + thread3.thread.isAlive()); // false
System.out.println("Main thread completed");
}
}
class NewThread implements Runnable {
protected String threadname;
protected Thread thread;
public NewThread(String threadname) {
this.threadname = threadname;
thread = new Thread(this, threadname);
thread.start();
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
System.out.println(threadname + ": " + i);
Thread.sleep(1000);
} catch (InterruptedException e) {}
}
System.out.println(threadname + " completed");
}
}
// Outout:
// One: 0
// Two: 0
// Three: 0
// One: 1
// Three: 1
// Two: 1
// Three: 2
// One: 2
// Two: 2
// Two: 3
// One: 3
// Three: 3
// Two: 4
// One: 4
// Three: 4
// Two completed
// One completed
// Three completed
// Main thread completed
После завершения потоков, метод join()
возвращает управление главному потоку. При этом, как видно из примера, сами потоки, до передачи управления, могут выполняться в произвольном порядке.
Что бы принять решение, когда разрешить исполнения потока - используется специальный Планировщик потоков.
Что бы установить или получить приоритет потока исполнения, для выбранного потока используется метод setPriotity(int level)
и getPriority()
. Аргумент level
обозначает новый уровень приоритета для потока, значение аргумента должно быть в пределах от Thread.MIN_PRIORITY
= 1 до Thread.MAX_PRIORITY
= 10. По умолчанию используется Thread.NORM_PRIORITY
= 5.
Если бы для примера выше, мы изменили для одного из потоков приоритет на более высокий, результат бы изменился:
...
thread3.thread.setPriority(Thread.MAX_PRIORITY);
thread1.thread.join();
thread2.thread.join();
thread3.thread.join();
...
// Outout:
// Three: 0
// Two: 0
// One: 0
// Three: 1
// Two: 1
// One: 1
// Three: 2
// Two: 2
// One: 2
// Three: 3
// Two: 3
// One: 3
// Three: 4
// One: 4
// Two: 4
Как видно из примера, теперь поток Three
всегда исполняется первым.
Когда два или более потока исполнения имеют доступ к одному совместно используемому ресурсу, они нуждаются в гарантии, что ресурс будет одновременно использован только одним потоком. Процесс, обеспечивающий такое поведение поток - называется синхронизацией. Ключом к синхронизации является понятие монитор.
Монитор - механизм управления, который "хранит" в себе только один поток выполнения. Как только поток исполнения войдет в монитор, все другие потоки исполнения должны ожидать до тех пор, пока тот не покинет монитор. Таким образом, монитор может служить для защиты общих ресурсов от одновременного использования более чем одним потоком.
В Java каждый объект имеет свой неявный монитор, который осуществляется автоматически, когда для этого объекта вызывается
синхронизированный метод. Когда поток исполнения находится в теле синхронизированного метода, ни один другой поток не может вызвать какой-нибудь синхронизированный метод для того же самого объекта.
Монитор - это механизм контроля одновременного доступа к объекту.
Синхронизировать код можно с помощью ключевого слова synchronize
. Что бы войти в монитор объекта, достаточно вызвать метод, объявленный с помощью модификатора доступа synchronize
. Когда поток исполнения оказывается в теле синхронизированного метода, все другие потоки, пытающиеся вызвать этот метод, вынуждены ждать.
Представим, что у нас есть класс Caller
, реализующий интерфейс Runnable
и в переопределенном методе run()
мы вызываем метод call()
класса Callme
. В методе main()
создаем 3 экземпляра объекта Caller
, передаем им в конструктор определенные аргументы и запускаем их потоки:
public class Synch {
public static void main(String[] args) {
Callme callme = new Callme();
Caller caller1 = new Caller("Добро пожаловать", callme);
Caller caller2 = new Caller("в синхронизированный", callme);
Caller caller3 = new Caller("мир!", callme);
try {
caller1.thread.join();
caller2.thread.join();
caller3.thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Caller implements Runnable {
String message;
Callme callme;
Thread thread;
public Caller(String message, Callme callme) {
this.message = message;
this.callme = callme;
thread = new Thread(this);
thread.start();
}
@Override
public void run() {
callme.call(message);
}
}
class Callme {
public void call(String message) {
try {
System.out.print("[" + message);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("]");
}
}
// Output:
// [Добро пожаловать[в синхронизированны[мир!]
// ]
// ]
Применение метода sleep()
приводит к смешанному выводу строк сообщения, т.к. в этом методе отсутствует механизм, предотвращающий одновременный вызов объекта из разных потоков.
Решением этой проблемы является применение ключевого слова synchronized
для нужного метода:
public synchronized void call(String message)
Благодаря этому, результат выполнения программы будет такой:
// Output:
// [Добро пожаловать]
// [в синхронизированный]
// [мир!]
Но опять же стоит отметить, что вызов потоков в определенной последовательности - не гарантирует вывод строк в том же порядке, для этого нужно устанавливать приоритет.
Если возникает случай, когда требуется синхронизировать доступ к объекту класса, в котором не используются синхронизированные методы или класс написан сторонним разработчиком, а его исходный код недоступен, тогда вызов такого объекта следует заключить в блок оператора synchronized
:
synchronized (ссылка_на_объект) {
// синхронизируемые операторы
}
Используем код из предыдущего примера, только на этот раз с применением оператора synchronized
:
@Override
public void run() {
synchronized (callme) {
callme.call(message);
}
}
В данном примере метод call()
объявлен без модификатора доступа synchronized
, но благодаря использованию оператора synchronized
, результат вывода остается верным, поскольку каждый поток исполнения ожидает завершения предыдущего потока.
В Java существуют методы для взаимодействия между потоками исполнения (вызваны они могут быть только из синхронизированного блока или метода):
-
wait()
- вынуждает вызывающий поток уступить монитор и перейти в состояние ожидания до тех пор, пока какой-нибудь другой поток исполнения не войдет в тот же монитор и не вызовет методnotify()
(или же по тайм-ауту, который передается как аргумент метода); -
notify()
- возобновляет исполнение потока, из которого был вызван методwait()
для того же самого объекта; -
notifyAll()
- возобновляет исполнение всех потоков, из которых был вызван методwait()
для того же самого объекта. Одному из этих потоков предоставляется доступ.
Важно, что бы методы wait()
и notify()
были вызван для одного объекта. Всегда желательно указывать конкретный объект, для которого вызываются эти методы, иначе вызов будет происходить для элемента this
.
Все эти методы объявлены в классе Object
, поэтому они доступны всем классам.
Рассмотрим на примере:
public class ExApplication {
public static void main(String[] args) throws InterruptedException {
WaitAndNotify wan = new WaitAndNotify();
Thread thread1 = new Thread(() -> {
wan.method1();
});
Thread thread2 = new Thread(() -> {
wan.method2();
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
}
class WaitAndNotify {
public void method1() throws InterruptedException {
synchronized (this) {
System.out.println("Поток 1 запущен");
this.wait(); // останавливаем поток
System.out.println("Поток 1 продолжил выполнение");
}
}
public void method2() throws InterruptedException {
Thread.sleep(2000);
synchronized (this) {
System.out.println("Поток 2 запущен");
this.notify(); // пробуждаем остановленный поток
}
}
}
Output:
Поток 1 запущен
Поток 2 запущен
Поток 1 продолжил выполнение
В данном примере первый поток вызвал метод wait()
, который позволил зайти в монитор объекта второму потоку, который в свою очередь вызвал метод notify()
, что "пробудило" первый поток и он продолжил свою работу.
Взаимная блокировка - особый тип ошибок в многозадачности, которая происходит в том случае, когда потоки исполнения имеют циклическую зависимость от пары синхронизированных объектов. Пример - один поток исполнения входит в монитор объекта A, а другой - в монитор объекта B. Если поток исполнения в объекте А попытается вызвать любой синхронизированный метод объекта В, он будет блокирован, как и предполагалось. Но если поток объекта В, в свою очередь, попытается вызвать синхронизированный метод объекта А, то этот поток будет ожидать вечно, поскольку для получения доступа к объекту А он должен снять свою блокировку с объекта В, чтобы первый поток исполнения мог завершиться.
С помощью метода interrupt()
можно вызвать прерывание потока. Вызов этого метода устанавливает у потока признак того, что он прерван и в этом случае возвращается true.
При это сам вызов этого метода не завершает поток, он только устанавливает статус прерывания. Этот статус можно проверить с помощью метода isInterrupted()
и уже после принимать какие-либо действия.
Рассмотрим на примере:
public class ExApplication {
public static void main(String[] args) throws InterruptedException {
Thread myThread = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("Thread was interrupted");
break; // проверяем признак interrupt и выходим из цикла
}
System.out.println("process...");
}
});
myThread.start();
myThread.interrupt(); // устанавливаем статус прерывания
myThread.join();
}
}
Также следует учитывать, что если мы обрабатываем исключение InterruptedException в потоке, который хотим прервать - будет вызвано исключение.
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Thread was interrupted");
break;
}
System.out.println("process...");
}
Данный пример будет идентичен предыдущему.
Если нам нужна коллекция, которая будет работать с несколькими потоками, есть два варианта:
-
Synchronized collections - получаются из традиционных коллекций благодаря их "обертыванию" с помощью
Collections.synchronized()
- Concurrent collections - иназначально созданы для работы с многопоточностью
Synchronized collections достигают потоко-безопасности благодаря тому, что используют блокировку потоков через synchronized-блоки для всех методов коллекции. Т.е. работа с коллекцией будет доступна одновременно только одному потоку, после его завершения - другому и т.д. Производительность из за такого подхода получается достаточно низкая.
Давайте для начала рассмотрим, как будет вести себя обычная коллекция при работе с несколькими потоками.
Пример 1.
public static void main(String[] args) throws InterruptedException {
List<Integer> source = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> target = new ArrayList<>();
Runnable runnable = () -> target.addAll(source);
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(target);
}
Мы создали две коллекции source
и target
, и в новом потоке пытаемся добавить все содержимое из коллекции source
в target
.
Далее запускаем созданный поток два раза, ожидается, что результат будет - [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
Но после нескольких запусков результат может быть, например, такой - [null, null, null, null, null, 1, 2, 3, 4, 5]
Следовательно, используя обычную коллекцию в многопоточном режиме - результат операции невозможно предугадать.
Что бы исправить ситуацию, воспользуемся синхронизированный обёрткой для второй коллекции:
List<Integer> synchTarget = Collections.synchronizedList(new ArrayList<>());
Теперь с данным листом в один и тот же момент времени может работать только один поток (даже если второй поток будет работать не с методом addAll()
, а с другими). Из за такого подхода страдает производительность, но зато результат выполнения мы всегда сможем предугадать.
Пример 2.
public static void main(String[] args) throws InterruptedException {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
Runnable runnable1 = () -> {
for (Integer integer : list) {
System.out.println("run1: " + integer);
}
};
Runnable runnable2 = () -> list.remove(1);
Thread thread1 = new Thread(runnable1);
Thread thread2 = new Thread(runnable2);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(list);
}
В этом примере мы пытаемся пройтись итератором по коллекции list
в одном потоке, а в другом потоке удалить один из элементов коллекции.
После запуска программы мы увидим ошибку:
run1: 0
run1: 1
Exception in thread "Thread-0" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
at com.gome.kafka.temppkg.TempApp.lambda$main(TempApp.java:16)
at java.lang.Thread.run(Thread.java:748)
[0, 2, 3, 4, 5, 6, 7, 8, 9]
В этом случае не поможет даже синхронизированная обертка для коллекции:
List<Integer> synchList = Collections.synchronizedList(list);
После повторного запуска, мы, скорее всего, увидим ту же ошибку.
Дело в том, что итераторы подвержены сбоям в многопоточном приложении. Если один поток изменяет содержимое коллекции, а второй поток обрабатывает ее итератором, то это может вызвать исключение ConcurrentModificationException
.
Потому что в данном случае срабатывает блокировка только для метода remove()
, но блокировка на итерирование не ставится.
Что бы обезопасить приложение от такого исключения, необходимо полностью блокировать коллекцию на время перебора, т.е. синхронизировать блок с итерацией:
public static void main(String[] args) throws InterruptedException {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
List<Integer> synchList = Collections.synchronizedList(list); // синхронизировали коллекцию
Runnable runnable1 = () -> {
synchronized (synchList) { // добавили блоки синхронизации при итерации
for (Integer integer : synchList) {
System.out.println("run1: " + integer);
}
}
};
Runnable runnable2 = () -> synchList.remove(1);
Thread thread1 = new Thread(runnable1);
Thread thread2 = new Thread(runnable2);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(list);
}
Теперь все будет работать правильно и без исключений.
Одной из Concurrent-коллекции является ConcurrentHashMap.
ConcurrentHashMap
имплеминтирует интерфейс ConcurrentMap
, который в свою очередь наследуется от интерфейса Map
.
В отличии от обычной, синхронизированной коллекции (как в примере выше), в ConcurrentHashMap любое количество потоков может читать информацию без каких либо локов.
ConcurrentHashMap делит множество элементов, которые он хранит, на сегменты, сегменту соответствует каждый Бакет.
Сегмент - это элемент массива ConcurrentHashMap и поэтому несколько потоков могут одновременно изменять элементы в разных сегментах, но не в одном и том же. Т.е. с одним сегментом может работать только один поток - это называется сегмент-лок.
Таким образом, ConcurrentHashMap
, из за его сегментирования, работает намного эффективнее чем синхронизированный HashMap
.
Не принимает null в качестве ключа или значения.
Рассмотрим на примере.
public static void main(String[] args) throws InterruptedException {
Map<Integer, String> map = new ConcurrentHashMap<>();
map.put(1, "John");
map.put(2, "Kate");
map.put(3, "Steve");
System.out.println(map);
Runnable runnable1 = () -> {
for (Map.Entry<Integer, String> entry : map.entrySet()) {
System.out.println(entry.getKey() + " " + entry.getValue());
}
};
Runnable runnable2 = () -> map.put(4, "Sarah");
Thread thread1 = new Thread(runnable1);
Thread thread2 = new Thread(runnable2);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(map);
}
В этом примере мы запускаем одновременно два потока - один поток просто выводит значения, а второй поток - добавляет значения.
При использовании обычной коллекции мы бы получили исключение ConcurrentModificationException
, но не в случае с ConcurrentHashMap
.
Результат работы будет такой:
{1=John, 2=Kate, 3=Steve}
1 John
2 Kate
3 Steve
4 Sarah
{1=John, 2=Kate, 3=Steve, 4=Sarah}
Коллекции для работы с многопоточностью. Их следует использовать тогда, когда нужно добиться потокобезопасности, и при этом у вас небольшое количество операций по изменению элементов, но большое количество операций по их чтению.
Это обусловлено тем, что при каждом изменении элементов создается новая копия коллекции. Тем самым гарантируется, что при итерации по коллекции не будет исключения ConcurrentModificationException
.
Существует два вида таких коллекций - CopyOnWriteArrayList
и CopyOnWriteArraySet
.
Блокирующие очереди используются в тех случаях, когда нужно выполнить проверку каких-либо условий для продолжения потоками своей работы. Блокирующие очереди могут быть реализованы с помощью интерфейсов BlockingQueue
, BlockingDeque
и TransferQueue
.
BlockingQueue
- определяет блокирующую очередь и хранит элементы в порядке FIFO. Реализация данного интерфейса обеспечивает блокировку в двух случаях:
- при попытке получения элемента из пустой очереди;
- при попытке размещения элемента в полной очереди.
Когда поток пытается получить элемент из пустой очереди, то он переводится в состояние ожидания до тех пор, пока какой-либо другой поток не разместит элемент в очереди. Аналогично при попытке положить элемент в полную очередь - поток ожидает до тех пор, пока другой поток не заберет элемент из очереди, и таким образом, не освободит место в ней.
Существует такой паттерн, как consumer-producer (поставщик-потребитель) - это базовая реализация обмена данными между несколькими потоками. Поток-производитель отправляет объекты на обработку в буфер, потоки-потребители принимают и обрабатываю их. Если буфер пуст, потребитель ждет, если буфер полон - ждет производитель.
Одним из способов реализации такого паттерна является использование BlockingQueue
.
Рассмотрим на примере:
public class ExApplication {
private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); // создана очередь с фиксированным размером
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
producer();
});
Thread thread2 = new Thread(() -> {
consumer();
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
// метод бесконечно записывает в очередь случайные значения
private static void producer() throws InterruptedException {
Random random = new Random();
while (true)
queue.put(random.nextInt(100));
}
// метод бесконечно извлекает значения из очереди
private static void consumer() throws InterruptedException {
while (true) {
System.out.println(queue.take());
System.out.println("Queue size is " + queue.size());
}
}
}
В данном примере использована очередь ArrayBlockingQueue
, которая имеет фиксированный размер при создании. Для вставки и извлечения данных используются методы put()
и take()
, которые позволяют блокировать потоки. Новые элементы вставляются в конец очереди, а извлекаются из головы очереди.
ExecutorService - это средство для управления и работы с потоками. С помощью него можно создавать пул потоков.
Рассмотрим на примере:
public class ExApplication {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2); // создается пул потоков
for (int i = 0; i < 5; i++)
executorService.submit(new Work(i)); // передаем задачи в работу
executorService.shutdown(); // выполняем потоки
executorService.awaitTermination(1, TimeUnit.DAYS); // ожидаем выполнение потоков
}
}
@AllArgsConstructor
class Work implements Runnable {
private int id;
@Override
public void run() {
System.out.println("Work " + id + " was completed");
}
}
Output:
Work 0 was completed
Work 1 was completed
Work 2 was completed
Work 3 was completed
Work 4 was completed
Рассмотрим подробнее:
С помощью метода Executors.newFixedThreadPool(2)
создаем фиксированный пул потоков, в данном случае он будет содержать 2 потока.
После чего, в метод executorService.submit(new Work(i))
поочередно передаем в пул 5 задач. Т.е. два наших созданных потока должны будут одновременно выполнять эти задачи.
Метод executorService.shutdown()
вызывает упорядоченное завершение работы, при которой отправленные задачи выполняются, а новые больше не принимаются.
Метод executorService.awaitTermination()
ожидает окончание потоков до того момента, как они завершат свою работу или не наступит установленный тайм-аут.
Интерфейс Callable<V>
схож с интерфейсом Runnable
- их методы необходимо реализовать для использовании в новом потоке.
Однако интерфейс Callable
параметризируется возвращаемым значением из метода call()
, а метод run()
интерфейса Runnable
не возвращает ничего.
Рассмотрим метод run()
на примере:
executorService.submit(new Runnable() {
@Override
public void run() {
// some logic
}
});
Метод run()
ничего не возвращает, соответственно мы не можем получить какое-либо возвращаемое значение из этого потока.
Единственное решение при использовании Runnable
- это запись нужных данных во внешнее поле.
Получить значения из потока можно с помощью интерфейса Future
, он должен быть параметризирован тем же типом, что и Callable
:
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<Integer> future = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() {
return 5;
}
});
executorService.shutdown();
Integer result = future.get(); // получаем значение потока
System.out.println(result); // 5
}
Получить значение из Future
помогает метод get()
. Данный метод будет ожидать окончание выполнения потока.
Похожим образом мы можем получать из потока исключение и обработать его, например:
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<Integer> future = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Random random = new Random();
int randomInt = random.nextInt(10);
if (randomInt < 5) {
throw new Exception("Value less than 5"); // выбрасываем исключение
}
return randomInt;
}
});
executorService.shutdown();
try {
Integer result = future.get(); // получаем значение потока
System.out.println(result);
} catch (ExecutionException e) { // ловим и обрабатываем исключение
Throwable cause = e.getCause();
System.out.println(cause.getMessage()); // Value less than 5
}
}
При реализации интерфейсов с помощью лямба-выражения, Java сама понимает, когда в лямбда-выражение нужно подставить интерфейс Runnable
, а когда Callable
- все зависит от наличия передаваемого значения:
executorService.submit(() -> {
int abs = getAbs(5);
return abs; // Используется интерфейс Callable<Integer>
});
executorService.submit(() -> {
System.out.println("hello"); // Используется Runnable
});
Habr. Обзор java.util.concurrent.*
Блокирующие очереди пакета concurrent