자바 Concurrent 라이브러리 - swsuh93/study GitHub Wiki
최대 지정한 개수 만큼의 쓰레드를 가질 수 있는 쓰레드 풀을 생성, 실제 생성되는 객체는 ThreadPoolExecutor 객체이다.
항상 일정한 쓰레드 개수를 유지한다. 쓰레드가 유휴상태이더라도 제거하지 않고 유지한다.
다만 작업도중 비정상적으로 쓰레드가 종료하는 경우에는 쓰레드를 추가로 생성하며, nThreads 개수보다 1개 더 생길 수도 있다.
지정한 개수만큼 쓰레드가 유지되는 스케줄 가능한 쓰레드 풀을 생성한다. 실제 생성되는 객체는 ScheduledThreadPoolExecutor 객체이다.
하나의 쓰레드만 사용하는 ExecutorService를 생성한다.
항상 1개의 쓰레드만 동작한다. 따라서 쓰레드가 동작중일 경우 나머지 작업은 모든 큐에서 대기하며, 순서대로 하나씩 실행된다. 만약 비정상적으로 쓰레드가 종료되는 경우, 새로 쓰레드를 생성하고 남은 작업을 계속 한다.
하나의 쓰레드만 사용하는 ScheduledExecutorService를 생성한다. 일정시간 이후에 실행되거나 주기적으로 작업을 실행할 수 있으며. 쓰레드의 수가 고정되어있는 형태의 Executor.Timer 클래스의 기능과 유사하다.
필요할 때 마다 쓰레드를 생성하는 쓰레드 풀을 생성한다. 이미 생성된 쓰레드의 경우 재사용된다.
실제 생성되는 객체는 ThreadPoolExecutor 객체이다. 쓰레드 개수에 제한이 없이 필요한 경우 계속 쓰레드 수가 증가한다. 다만 일정 시간(60초)동안 사용하지 않는(idle) 쓰레드는 종료된다.
필요없는 쓰레드를 제거하므로 서버 리소스(memory)는 적게 사용하지만, 쓰레드 생성과 삭제를 반복하므로 작업 부하가 불규칙적인 경우 비효율적이다.
CopyOnWrite가 말해주는 것처럼 read(select) 시는 아무런 동기화 문제가 없기 때문에 놔두고 변경이 일어날 경우 객체를 clone해서 다루자는 전략. 따라서 읽기 행위가 많이 일어나는 곳에서 사용하기 좋다.
보통 생산자 - 소비자 패턴에서 활용되는 큐로 많이 사용된다. 이 큐는 멀티쓰레드 환경에서 대표할만한 컬렉션이다.
소비자가 꺼내어 사용할동안 생산자는 멈춰있고, 생산자가 넣을동안 소비자는 멈춰있어야한다.
ConcurrentHashMap은 Map의 일부에만 Lock을 걸기 때문에 HashTable과 synchronized Map 보다 효율적이다.
함수 안의 로컬 변수는 쓰레드마다 고유하게 가질 수 있다. 쓰레드마다 고유의 변수를 해당 함수의 안 뿐만 아니라, 클래스의 정적 멤버등으로 생성하여 각각의 쓰레드가 다른 곳에서 사용하고 싶을 때 사용한다.
ThreadLocal은 한 쓰레드에서 실행되는 코드가 동일한 객체를 사용할 수 있도록 해주기 때문에 쓰레드와 관련된 코드에서 파라미터를 사용하지 않고 객체를 전파하기 위한 용도로 주로 사용되며, 주요 용도는 다음과 같다.
- 사용자 인증 정보 전파 : Spring Security에서는 ThreadLocal을 이용해서 사용자 인증 정보를 전파한다.
- Transaction Context 전파 : 트랜잭션 매니저는 트랜잭션 컨텍스트를 전파하는데 ThreadLocal을 사용한다.
- Thread Safe 해야 하는 데이터 보관
주의 사항 : 쓰레드 풀 환경에서 ThreadLocal을 사용하는 경우 ThreadLocal 변수에 보관된 데이터의 사용이 끝나면 반드시 해당 데이터를 삭제해 주어야한다. 그렇지 않을 경우 재사용되는 쓰레드가 올바르지 않은 데이터를 참조할 수 있다.
모든 쓰레드(task)가 종료되면, 호출 될 필요가 있는 곳에 사용된다. 관련 쓰레드들의 이벤트들을 감지하기 위해 사용한다.
모든 쓰레드가 종료가 아니라 블럭되면, 호출 될 필요가 있는 곳에 사용된다.
다른 쓰레드를 기다리기 위해 사용한다고 볼 수 있다.
하마(HAMA)라는 분산 머신러닝에 사용되는 오픈소스가 있는데, BSP 알고리즐을 사용한다. BSP(Binary Space Partitioning)란 각각의 컴퓨터가 일을 하고, 자신의 일이 끝나면 멈춰있게 된다, 모든 컴퓨터들이 멈춰있게되면 서로 커뮤니케이션하는 구조인데, 전체 컴퓨터들의 분산 락을 걸어주는 것과 비슷하다. 분산 락은 Zookeeper라는 오픈소스를 사용한다.
서로 다른 쓰레드에서 각각의 데이터(컬렉션을 통채로도)를 주고 받을 수 있게 한다.
package test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
public class ExchangerTest {
private static final int FULL = 5;
private static final int COUNT = FULL * 2;
private static final Random random = new Random();
private static volatile int sum = 0;
private static Exchanger<List<Integer>> exchanger = new Exchanger<List<Integer>>();
private static CountDownLatch stopLatch = new CountDownLatch(2);
private static List<Integer> initiallyEmptyBuffer;
private static List<Integer> initiallyFillBuffer;
private static class FillingLoop implements Runnable {
public void run() {
List<Integer> currentBuffer = initiallyFillBuffer;
try {
for (int i = 0; i < COUNT; i++) {
if (currentBuffer == null)
break;
Integer item = random.nextInt(100);
System.out.println("Item Added: " + item);
currentBuffer.add(item);
if (currentBuffer.size() == FULL) {
currentBuffer = exchanger.exchange(currentBuffer);
}
}
} catch (InterruptedException ex) {
System.out.println("Bad exchange on filling side");
}
stopLatch.countDown();
}
}
private static class EmptyingLoop implements Runnable {
public void run() {
List<Integer> currentBuffer = initiallyEmptyBuffer;
try {
for (int i = 0; i < COUNT; i++) {
if (currentBuffer == null)
break;
if (currentBuffer.isEmpty()) {
currentBuffer = exchanger.exchange(currentBuffer);
}
Integer item = currentBuffer.remove(0);
System.out.println("Item Got: " + item);
sum += item.intValue();
}
} catch (InterruptedException ex) {
System.out.println("Bad exchange on emptying side");
}
stopLatch.countDown();
}
}
public static void main(String args[]) {
initiallyEmptyBuffer = new ArrayList<Integer>();
initiallyFillBuffer = new ArrayList<Integer>();
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
try {
stopLatch.await();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("Sum of all items is.... " + sum);
}
}모든 쓰레드가 종료되면, 호출 될 필요가 있는곳에 사용된다.
package test;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
public class PreLoader {
private final FutureTask<List<String>> task1
= new FutureTask<List<String>>(new MyCallable());
private final FutureTask<List<String>> task2
= new FutureTask<List<String>>(new MyCallable());
private final FutureTask<List<String>> task3
= new FutureTask<List<String>>(new MyCallable());
private final FutureTask<List<String>> task4
= new FutureTask<List<String>>(new MyCallable());
private final FutureTask<List<String>> task5
= new FutureTask<List<String>>(new MyCallable());
ExecutorService es = Executors.newFixedThreadPool(5);
public void testGO() throws Exception{
es.submit(task1);
Thread.sleep(300);
es.submit(task2);
Thread.sleep(300);
es.submit(task3);
Thread.sleep(300);
es.submit(task4);
Thread.sleep(300);
es.submit(task5);
PrintTest(task2);
PrintTest(task5);
PrintTest(task4);
PrintTest(task3);
PrintTest(task1);
Thread.sleep(20000);
es.shutdown();
} //End Of testGo
private void PrintTest(FutureTask<List<String>> tempTask){
List<String> k = null;
try {
k = tempTask.get();
} catch (InterruptedException e) {
System.out.println("Exception : "+e.getMessage());
} catch (ExecutionException e) {
System.out.println("Exception : "+e.getMessage());
} //End Of try
for(String l : k){
System.out.println(l);
}
} //End Of printTest
public static void main(String[] args) {
PreLoader preloader = new PreLoader();
try {
preloader.testGO();
} catch (Exception e) {
System.out.println(e.getMessage());
} //End Of try
} //End Of main
} //End Of Class
class MyCallable implements Callable<List<String>>{
@Override
public List<String> call() throws Exception {
List<String> ret = new Vector<String>();
for (int i = 0; i < 10; i++) {
String temp = "안녕하세요 " + Thread.currentThread() + " 번 입니다.";
Thread.sleep(300);
ret.add(temp);
}
return ret;
}
}package test;
import java.util.Random;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
private static final Random rd = new Random(10000);
static class Log {
public static void debug(String strMessage) {
System.out.println(Thread.currentThread().getName() + " : " + strMessage);
}
}
class SemaphoreResource extends Semaphore {
private static final long serialVersionUID = 1L;
public SemaphoreResource(final int permits) {
super(permits);
}
public void use() throws InterruptedException {
acquire(); // 세마포어 리소스 확보
try {
doUse();
} finally {
release(); // 세마포어 리소스 해제
Log.debug("Thread 종료 후 남은 permits: " + this.availablePermits());
}
}
protected void doUse() throws InterruptedException {
// 임의의 프로그램을 실행하는데 거리는 가상의 시간
int sleepTime = rd.nextInt(500);
Thread.sleep(sleepTime); // 런타임 시간 설정
Log.debug(" Thread 실행..................." + sleepTime);
/** something logic **/
}
}
class MyThread extends Thread {
private final SemaphoreResource resource;
public MyThread(String threadName, SemaphoreResource resource) {
this.resource = resource;
this.setName(threadName);
}
@Override
public void run() {
try {
resource.use();
} catch (InterruptedException e) {
} finally { }
}
}
public static void main(String... s) {
System.out.println("Test Start...");
SemaphoreResource resource = new SemaphoreTest().new SemaphoreResource(4);
for (int i = 0; i < 20; i++) {
new SemaphoreTest().new MyThread("Thread-" + i, resource) .start();
}
}
}